In the Hadoop cluster, I am running Spark 2.0 on a Client node, separately from Hive services.
If I would like to connect to Hive Metastore using spark-sql, hive-hcatalog-core-0.13.0.jar has to be added to the jars folder in Spark home directory.
Step into $SPARK_HOME/jars folder and run the following
sudo -u spark wget http://central.maven.org/maven2/org/apache/hive/hcatalog/hive-hcatalog-core/0.13.0/hive-hcatalog-core-0.13.0.jar
Now I can run spark-sql and queries on the tables in the databases.
If the jar file is missing, and by running for example
desc table_name
the following error message is displayed
ERROR hive.log: error in initSerDe: java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found
java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:385)
at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:258)
at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:339)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:335)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:335)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:333)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:262)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:209)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:208)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:251)
at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:333)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:228)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:227)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:456)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:126)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:274)
at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:414)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Since all my clusters have Hive services running on separate nodes than Spark services, I would not know if this is needed in cases when Hive services and Spark services are on the same node.
You can use the json-serde jar hosted on the public crawler bucket, then load the jar in your spark sessions like so. We had “s3″ directories, so we had to do recursive traversal of the s3 prefix. We also used ‘org.openx.data.jsonserde.JsonSerDe” as the serialisation lib for the Glue table for this to work.
spark = (SparkSession.builder.appName(“AWS Spark Test”)
.config(“spark. Jars”, “s3://crawler-public/json/serde/json-serde.jar”) // load the jar
.config(“mapreduce.input.fileinputformat.input.dir.recursive”,True) // allow directory recursion for JSON
… other options …
.enableHiveSupport()
.getOrCreate()
)
spark.sql(“select * from db.table_name limit 100”).show()
// profit
LikeLike