Error “org.apache.hive.hcatalog.data.JsonSerDe not found” while accessing Hive tables from spark-sql

In the Hadoop cluster, I am running Spark 2.0 on a Client node, separately from Hive services.

If I would like to connect to Hive Metastore using spark-sql, hive-hcatalog-core-0.13.0.jar has to be added to the jars folder in Spark home directory.

Step into $SPARK_HOME/jars folder and run the following

sudo -u spark wget http://central.maven.org/maven2/org/apache/hive/hcatalog/hive-hcatalog-core/0.13.0/hive-hcatalog-core-0.13.0.jar

Now I can run spark-sql and queries on the tables in the databases.

If the jar file is missing, and by running for example

desc table_name

the following error message is displayed

ERROR hive.log: error in initSerDe: java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found
java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:385)
        at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
        at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:258)
        at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:339)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:335)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:335)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:333)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:262)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:209)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:208)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:251)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:333)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
        at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:227)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:456)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:126)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:274)
        at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:414)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Since all my clusters have Hive services running on separate nodes than Spark services, I would not know if this is needed in cases when Hive services and Spark services are on the same node.

Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

About Storm’s Nimbus

This post describes Nimbus and shows how its use with single Nimbus in Storm cluster, as well as Nimbus H/A.

I have a Hadoop cluster installed using Ambari. The distribution is Hortonworks. Storm installation with Ambari is described here

A basic example of Storm topology – writing to HDFS can be seen here. Might be smart to submit one topology first in orderto easier understand the terms like Bolt, Supervisor, Nimbus…

About Nimbus

Nimbus is the master node in Storm cluster, it is the NameNode to your Hadoop.

Responsibilities:

  1. distributing code to Supervisors
  2. assigning tasks
  3. monitoring tasks
  4. restarting tasks when needed

Thrift

Thrift is a member of Apache family. It is a software framework (binary protocol) used for scalable cross language communication. Nimbus is a thrift service, and wide use of thrift in Storm allows users to define and submit topologies from any language.
Nimbus thrift API exposes all the information needed to monitor he Storm cluster.

ZooKeeper’s role

Nimbus stores all of its data in ZooKeeper. It is fail-fast (like Supervisor), so if Nimbus dies, the restart has no effect on the running tasks on the Supervisors.

Nimbus and Supervisors communicate through Zookeeper. This means that all data is stored in Zookeeper.

Submitting Topology in Storm Cluster

From the Storm client, the topology is submitted first to the Nimbus and Nimbus distributes it further to the Supervisors.

Single Nimbus in Storm Cluster

The only Nimbus in the Storm cluster is installed on the Hadoop NameNode.
If the Nimbus is not running, Storm UI (on port 8744) returns the following error message

java.lang.RuntimeException: Could not find leader nimbus from seed hosts ["nimbus-server1"]. Did you specify a valid list of nimbus hosts for config nimbus.seeds

Start Nimbus service from Ambari.

The Storm UI, under Nimbus Summary shows one host. It’s default port is 6627 and the status is “Leader”.
I am running one simple test Topology RandomWordsHdfsTopology and the log on the Supervisor executing the Bolt is showing me lines in the following manner:

2016-10-08 11:13:37.885 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Spark] TASK: 4 DELTA:
2016-10-08 11:13:37.986 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Hadoop]
2016-10-08 11:13:37.986 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME:  TUPLE: source: random-words-spout:5, stream: default, id: {}, [Hadoop]
2016-10-08 11:13:37.986 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Hadoop] TASK: 4 DELTA:
2016-10-08 11:13:38.087 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Kafka]
2016-10-08 11:13:38.088 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME:  TUPLE: source: random-words-spout:5, stream: default, id: {}, [Kafka]
2016-10-08 11:13:38.088 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Kafka] TASK: 4 DELTA:
2016-10-08 11:13:38.188 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Storm]
2016-10-08 11:13:38.189 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME: 0 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Storm]

And the random words are being written to a file in HDFS.

If the Nimbus shuts down, Zookeeper and Supervisor continue running the Topology. In this case, the log file on the Supervisor keeps logging random words and the file in HDFS continues to be appended. The Storm UI shows the error message posted above and running

storm list

from the Storm client machine returns the same error message.

Starting the Nimbus again and looking at the $STORM_LOGS/nimbus.log on nimbus-server1 teaches us how Nimbus reacts upon restart.
Some lines taken from the log file:

b.s.zookeeper [INFO] nimbus-server1 gained leadership, checking if it has all the topology code locally.
b.s.zookeeper [INFO] active-topology-ids [RandomWordsHdfsTopology-1-1475917797] local-topology-ids [RandomWordsHdfsTopology-1-1475917797] diff-topology []
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.
b.s.d.nimbus [INFO] Starting Nimbus server...
...
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.

With other words, the active Topology did not suffer from Nimbus downtime. With Nimbus down, nonew Topologies can be submitted and existing ones cannot be manipulated.

Multiple Nimbus in Storm Cluster

Adding another Nimbus for Nimbus High Availability is simple in Ambari.
The second Nimbus is added on the Client node of the cluster. After it is added and the Storm service restarted, the Storm UI, under Nimbus Summary shows two Nimbus hosts one being Leader and one having status “Not a Leader”.

The client-server2, which has “Not a Leader” Nimbus reveals the following lines in the nimbus.log file:

...
b.s.d.nimbus [INFO] not a leader, skipping cleanup-corrupt-topologies
b.s.d.nimbus [INFO] Starting Nimbus server...
b.s.d.nimbus [INFO] not a leader, skipping assignments
b.s.d.nimbus [INFO] not a leader, skipping cleanup
b.s.d.nimbus [INFO] not a leader skipping , credential renweal.
...
b.s.d.nimbus [INFO] missing topology RandomWordsHdfsTopology-1-1475917797 has state on zookeeper but doesn't have a local dir on this host.
...
b.s.d.nimbus [INFO] trying to download missing topology code from NimbusInfo{host='nimbus-server1', port=6627, isLeader=false}

The “Not a Leader” Nimbus is now updated with the Storm CLuster and its topologies. Now the leader Nimbus is stopped:

b.s.zookeeper [INFO] client-server2 gained leadership, checking if it has all the topology code locally.
b.s.zookeeper [INFO] active-topology-ids [RandomWordsHdfsTopology-1-1475917797] local-topology-ids [RandomWordsHdfsTopology-1-1475917797] diff-topology []
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.

The Nimbus on client-server2 takes over as the Leader and Nimbus on the nimbus-server1 has status “Offline”.

When multiple Nimbus services are up and running, the “Leader” status is being switched between them. Roughly, this goes on every couple of minutes.

Nimbus has a vital role in the Storm Cluster and it is naive to think as long as Topology is running, I do not need Nimbus.