Creating first Scala project with sbt and submitting it to Apache Spark

The environment for the following project build was the following: Ubuntu 14.04 on a AWS EC2 instance, sbt version 0.13.13 (how to install it) and Apache Spark 2.0.1 on local mode (although the same procedure has been done and worked on a Hortonworks Hadoop cluster with Spark 2.0).

The Scala example file creates a SparkSession (if you are using Apache Spark version older than 2.0, check how to create all the context in order to run the example. Or upgrade to Spark 2.0!), reads a csv file into a DataFrame and outputs the DataFrame to the command line.

Create new project folder and step in it

mkdir scala-ne
cd scala-ne

Create data folder, step in it and create a test data file

mkdir data
cd data
vi n-europe.csv
1,Oslo,Norway
2,Stockholm,Sweden
3,Helsinki,Finland
4,Copenhagen,Danmark
5,Reykjavik,Iceland

Save the data file and exit vi.

Create the Scala file

vi spark-ne.scala

Copy the following code in the Scala file (make sure the path to the csv file is valid). If you are doing this on an node that is a part of HDFS cluster, be sure to add file:// at the beginning of the file path string.

import org.apache.spark.sql.SparkSession

object ne {

        def main(args: Array[String]) {
                val fil = "/SPARK-NE_PROJECT/data/n-europe.csv"

                val spark = SparkSession
                        .builder
                        .appName("Scala-Northern-E")
                        .getOrCreate()

                val neDF = spark.read.csv(fil)
                neDF.show()
        }
}

Create build file build.sbt

vi build.sbt

And type the following lines (make sure to leave the empty line between each line). Adjust the Scala and spark-sql version accordingly!

name := "Spark-ne"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

Run the following command to build the project

sbt package

The last three line of the output

[info] Packaging /user/marko/scala-ne/target/scala-2.11/spark-ne_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 69 s, completed Jan 7, 2017 8:22:38 PM

Running the sbt package command the first time is going to take more time because the jar files are downloading. Maven users should feel like home here.
If you make changes to the Scala file and run the command again, it takes less time. Below is an example of the next build call

[info] Packaging /user/marko/scala-ne/target/scala-2.11/spark-ne_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed Jan 7, 2017 8:29:12 PM

The code can now be submitted to Spark. Adjust the path to the JAR file accordingly.

$SPARK_HOME/bin/spark-submit --class ne /user/marko/scala-ne/target/scala-2.11/spark-ne_2.11-1.0.jar

If everything went as it should, the table of Northern European cities and countries is seen in the output.

My Work cluster in detail

The cluster was built on OpenStack private cloud owned by a Swiss organization Switch.

The Hadoop distributor was Hortonworks, except for Spark and Zeppelin, who were Apache’s.

Potential users

Since the project owner was an organization supporting educational entities in Switzerland, the potential users were researchers, scientists, students…

I had the luxury of having almost unlimited resources on the infrastructure so I have built 5 Hadoop clusters – 4 were Hortonworks Hadoop clusters, one was Apache Hadoop cluster. Out of the 4, one was the Work environment which was exposed to the end users. And this is the cluster that is described in detail in this post.
Keep in mind that I was working on my own on this development – which meant administering and upgrading 5 clusters and doing data science at the same time. In order to make it work, I had to use the YARN inside me and distribute the limited resources effectively.

Initial resources

Keeping in mind the point of distributed systems is scalability, I have defined the initial cluster with the following capabilities.
6 instances with corresponding details:

  • Ambari Server
  • NameNode
  • DataNode (3)
  • Client
Instance RAM VCPU Default disk size Volume No. Volume Size Security group
Ambari 8GB 8 VCPU 20GB None None sg-ambari
NameNode 32GB 8 VCPU 20GB 1 200GB sg-namenode
DataNode (3) 32GB 8 VCPU 20GB 3 200GB sg-datanode
Client 16GB 16 VCPU 20GB 1 500GB sg-client

Note: There were three DataNodes in the initial cluster.

Characteristics of the cluster

The initial cluster had 1.7 TB HDFS, replication factor was 3, block size was the default 128MB. Rack awareness was not set in the initial cluster and the queue was the default.
On the YARN side, I have made some changes and had 84GB RAM (3 x 32GM = 96GB RAM. 4GB per DataNode was left for services on the instance -> 96GB – 12GB = 84GB) as maximum amount of RAM resources for the cluster – the default values by Apache (Hortonworks?) are quite more conservative.

In the cluster building process the versions were Ambari 2.1 and HDP 2.3. When Ambari 2.2 and HDP 2.4 were available, the cluster was upgraded.

Ambari

Ambari had a server for itself, the database for collecting statistics was MySql. The idea was always to migrate the Ambari Server if needed. The migration to new Ambari server is easy so I could afford to start small for this service.
The Ambari Views was enabled for the users who wanted to upload the files to the HDFS manually. Hive was also available through this service and I on my one of my test environments, I have even embedded Zeppelin in Ambari Views. Though, on the Work cluster, Zeppelin was offered only as an independent service on the Client.
All the ports for Ambari to properly work were in the sg-ambari security group.

NameNode

The initial plan with the NameNode was to run all the services on it except Spark and Zeppelin. When the resources would expand beyond the instance’s capabilities, some services would be moved to a new instance, or unused services would be stopped (experience showed Hive had little popularity among the academia). Using Ambari, migrating services is an easy process, I could afford to have all services running on one NameNode. Only cluster administrator had access to this instance. With other words, client tools were not installed on this instance.
All the ports for the NameNode to properly work were in the sg-namenode security group.

DataNode

I started with 3 DataNodes, which offered 1.7TB of storage on the HDFS. The DataNodes were also used as Workers for Spark and Supervisors for Storm. The users had no access to the DataNodes directly – no client was installed here. This would change according to the needs so that some jobs could access data directly locally.
All the ports for the DataNodes to properly work were in the sg-datanode security group.

Client

The client was users’ window to the cluster. Spark 2.0 (before Summer 2016 it was Spark 1.6) was offered as the computational engine – only one. One reason was also easier administration and optimizatoin from my side.
The users could use the command line interface (CLI), RStudio or Zeppelin. Ambari Views as well, but that was running on Ambari instance. More advanced users went with the CLI, users who wanted to learn Spark were using Zeppelin.
Client for Storm was also installed on this instance. Due to more complex programming (in Java), all the Topologies were handled by me, the users were defining requirements and using the data stored by the Storm.
All the ports for the Client to properly work were in the sg-datanode security group.

See below for page 2.

Error “org.apache.hive.hcatalog.data.JsonSerDe not found” while accessing Hive tables from spark-sql

In the Hadoop cluster, I am running Spark 2.0 on a Client node, separately from Hive services.

If I would like to connect to Hive Metastore using spark-sql, hive-hcatalog-core-0.13.0.jar has to be added to the jars folder in Spark home directory.

Step into $SPARK_HOME/jars folder and run the following

sudo -u spark wget http://central.maven.org/maven2/org/apache/hive/hcatalog/hive-hcatalog-core/0.13.0/hive-hcatalog-core-0.13.0.jar

Now I can run spark-sql and queries on the tables in the databases.

If the jar file is missing, and by running for example

desc table_name

the following error message is displayed

ERROR hive.log: error in initSerDe: java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found
java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:385)
        at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
        at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:258)
        at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:339)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:335)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:335)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:333)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:262)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:209)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:208)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:251)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:333)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
        at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:227)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:456)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:126)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:274)
        at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:414)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Since all my clusters have Hive services running on separate nodes than Spark services, I would not know if this is needed in cases when Hive services and Spark services are on the same node.

Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Yarn application has already ended! It might have been killed or unable to launch application master.

If you are struggling with the error message in title of the post check if you are controlling ports that Spark needs. I have experienced that if the ports Spark is using can not be reached, YARN is going to terminate with the error message in the title. So it is best to control Spark ports and open them so that the YARN application would go through. More on Spark and networking here.

Spark chooses random ports and unless you have ALL ports open, you might run into the “endless”

INFO Client: Application report for application_1470560331181_0013 (state: ACCEPTED)

which eventually fails

INFO Client: Application report for application_1470560331181_0013 (state: FAILED)

and the error message returned would be

ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.

Adding something like this in spark-defaults.conf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

could solve this issue.

My notes on installing Spark 2.0 are here.

And how to install Spark 1.6 is described here.

Configuring Apache Spark History Server

Prior to configuring and running Spark History Server, Spark should be installed.

How to install Apache Spark 1.6.0 is described here.

How to install Apache spark 2.0 is described here.

Spark History server

Check that $SPARK_HOME/conf/spark-defaults.conf has History Server properties set

spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080

spark.history.kerberos.keytab none
spark.history.kerberos.principal none

Create spark-history directory in HDFS

sudo -u hdfs hadoop fs -mkdir /spark-history

Change the owner of the directory

sudo -u hdfs hadoop fs -chown spark:hdfs /spark-history

Change permission (be more restrictive if necessary)

sudo -u hdfs hadoop fs -chmod 777 /spark-history

Add user spark to group hdfs on the instance where Spark History Server is going to run

sudo usermod -a -G hdfs spark

To view Spark jobs from other users
When you open the History Server and you are not able to see Spark jobs you are expecting to see, check the Spark out file in the Spark log directory. If error message “Permission denied” is present, Spark History Server is trying to read the job log file, but has no permission to do so.
Spark user should be added to the group of the spark job owner.
For example, user marko belongs to a group employee. If marko starts a Spark job, the log file for this job will have user and group marko:employee. In order for spark to be able to read the log file, spark user should e added to the employee group. This is done in the following way

sudo usermod -a -G employee spark

Checking spark’s groups

groups spark

should return group employee among spark’s groups.

Start Spark History server

sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Output:

starting org.apache.spark.deploy.history.HistoryServer, logging to /var/log/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1-t-client01.out

Accessing Spark History server from the web UI can be done by accessing spark-server:18080. The following screen should load.

spark18080
A fresh Spark History Server installation has no applications to show (no applications in hdfs:/spark-history).

Spark History Server offers a great monitoring interface for Spark applications!

WARN ServletHandler: /api/v1/applications

If you happen to start Spark History Server but get neither completed nor incompleted applications on the Web UI, check the log files. If you get something like the following

WARN ServletHandler: /api/v1/applications
java.lang.NullPointerException
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
        at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
        at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
        at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
        at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
        at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
        at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.spark_project.jetty.servlets.gzip.GzipHandler.handle(GzipHandler.java:479)
        at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
        at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
        at org.spark_project.jetty.server.Server.handle(Server.java:499)
        at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:311)
        at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
        at org.spark_project.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
        at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
        at org.spark_project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
        at java.lang.Thread.run(Thread.java:745)

Take the jersey-bundle-*.jar file out of the $SPARK_HOME/jars directory. Hortonworks dont need it, you dont need it 🙂

Apache Spark 2.0.0 – Installation and configuration

I am running a HDP 2.4 multinode cluster with Ubuntu Trusty 14.04 on all my nodes. The Spark in this post is installed on my client node. My cluster has HDFS and YARN, among other services. All were installed from Ambari. This is not the case for Apache Spark 2.0, because Hortonworks does not offer Spark 2.0 on HDP 2.4.0

The documentation on the latest Spark version can be found here.

My notes on Spark 2.0 can be found here (if anyone finds them useful).

My post on setting up Apache Spark 1.6.0.

Update 12.January 2018: A post on how to install Apache Spark 2.2.1 Apache Spark 2.2.1 on Ubuntu 16.04 – Hadoop-less instance.

Prerequisities

Java

Update and upgrade the system and install Java

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

Add JAVA_HOME in the system variables file

sudo vi /etc/environment
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Spark user

Create user spark and add it to group hadoop

sudo adduser spark
sudo usermod -a -G hdfs spark

HDFS home directory for user spark

Create spark’s user folder in HDFS

sudo -u hdfs hadoop fs -mkdir -p /user/spark
sudo -u hdfs hadoop fs -chown -R spark:hdfs /user/spark

Spark installation and configuration

Install Spark

Create directory where spark directory is going to reside. Step into the directory

sudo mkdir /usr/apache
cd /usr/apache

Download Spark 2.0.0 from https://spark.apache.org/downloads.html

sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz

Unpack the tar file

sudo tar -xvzf spark-2.0.0-bin-hadoop2.7.tgz

Remove the tar file after it has been unpacked

sudo rm spark-2.0.0-bin-hadoop2.7.tgz

Change the ownership of the folder and its elements

sudo chown -R spark:spark spark-2.0.0-bin-hadoop2.7

Update system variables

Step into the spark 2.0.0 directory and run pwd to get full path

cd spark-2.0.0-bin-hadoop2.7
pwd

Update the system environment file by adding SPARK_HOME and adding SPARK_HOME/bin to the PATH

sudo vi /etc/environment

export SPARK_HOME=/usr/apache/spark-2.0.0-bin-hadoop2.7

At the end of PATH add

${SPARK_HOME}/bin

Refresh the system environments

source /etc/environment

Log and pid directories

Create log and pid directories

sudo mkdir /var/log/spark
sudo chown spark:spark /var/log/spark
sudo -u spark mkdir $SPARK_HOME/run

Spark configuration files

Hive configuration

Create a Hive warehouse and give permissions to the users. If Hive service is set up, the path to the Hive warehouse could be /apps/hive/warehouse

sudo -u hive hadoop fs -mkdir /user/hive/warehouse
sudo -u hdfs hadoop fs -chmod -R 777 /user/hive/warehouse

Find the hive-site.xml file – HDP versions usually have it in the following folder: /usr/hdp/current/hive-client/conf and copy it to $SPARK_HOME/conf.
The following property name in the file should be altered:
hive.metastore.warehouse.dir is replaced with spark.sql.warehouse.dir.

Spark environment file

Create a new file in under $SPARK_HOME/conf

sudo -u spark vi conf/spark-env.sh

Add the following lines and adjust aaccordingly.

export SPARK_LOG_DIR=/var/log/spark
export SPARK_PID_DIR=${SPARK_HOME}/run
export HADOOP_HOME=${HADOOP_HOME:-/usr/hdp/current/hadoop-client}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/hdp/current/hadoop-client/conf}
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SPARK_SUBMIT_OPTIONS="--jars ${SPARK_HOME}/lib/spark-csv_2.11-1.4.0.jar"

The last line serves as an example how to add external libreries to Spark. This particular package is quite common and is advised to install it. The package can be downloaded from this site.

Spark default file

Fetch HDP version

hdp-select status hadoop-client | awk '{print $3;}'

Example output for HDP 2.4:

2.4.0.0-169

Example output for HDP 2.5:

2.5.0.0-1245

Create spark-defaults.conf file in $SPARK_HOME/conf

sudo -u spark vi $SPARK_HOME/conf/spark-defaults.conf

Add the following and adjust accordingly (some properties belong to Spark History Server whose configuration is explained in the post in the link below)

spark.driver.extraJavaOptions -Dhdp.version=2.4.0.0-169
spark.yarn.am.extraJavaOptions -Dhdp.version=2.4.0.0-169
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080

spark.history.kerberos.keytab none
spark.history.kerberos.principal none

spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 384
spark.yarn.executor.memoryOverhead 384
spark.yarn.historyServer.address spark-server:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3

spark.jars.packages com.databricks:spark-csv_2.11:1.4.0

spark.io.compression.codec lzf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

The ports are defined in this configuration file. If they are not, then Spark assigns random ports. More on ports and assigning them in Spark can be found here.

If the ports are not under control, you risk the

Yarn application has already ended! It might have been killed or unable to launch application master.

error. More on that is written here.

JAVA OPTS

Create java-opts file in $SPARK_HOME/conf and add your HDP version

sudo -u spark vi $SPARK_HOME/conf/java-opts

Example:

-Dhdp.version=2.4.0.0-169

Fixing links in Ubuntu

Since the Hadoop distribution is Hortonworks and Spark is Apache’s, some workaround is in place. Remove the default link and create new ones.
First, the existing link is removed. Then the new link is created, pointing to the $SPARK_HOME/bin.

sudo rm /usr/hdp/current/spark-client
sudo ln -s /usr/apache/spark-2.0.0-bin-hadoop2.7 /usr/hdp/current/spark-client

Spark 2.0.0 is now almost ready.

Jersey problem

If you try to run a spark-submit command on YARN you can expect the following error message:

Exception in thread “main” java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Jar file jersey-bundle-*.jar is not present in the $SPARK_HOME/jars. Adding it fixes this problem:

sudo -u spark wget http://repo1.maven.org/maven2/com/sun/jersey/jersey-bundle/1.19.1/jersey-bundle-1.19.1.jar -P $SPARK_HOME/jars

January 2017 – Update on this issue:
If the following is done, Jersey 1 will be used when starting Spark History Server and the applications in Spark History Server will not be shown. The folowing error message will be generated in the Spark History Server output file:

WARN servlet.ServletHandler: /api/v1/applications
java.lang.NullPointerException
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)

This problem occurs only when one tries to run Spark on YARN, since YARN 2.7.3 uses Jersey 1 and Spark 2.0 uses Jersey 2

One workaround is not to add the Jersey 1 jar described above but disable the YARN Timeline Service in spark-defaults.conf

spark.hadoop.yarn.timeline-service.enabled false

Spark History Server

How Spark History Server is configured and brought to life is explained here. Absolutely worth setting it up, not only because it is very useful and practical for monitoring Spark applications, but also because in Spark 2.0 the graphical interface is more user friendly and, well, more graphical.

Hive SerDe error when querying from spark-sql

If you plan to use spark-sql, it is maybe worth checking this post to avoid the jsonserde not found error message.

Notes about Spark 2.0

My Apache Spark 2.0 Notes

Manipulating files in S3 with Spark is mentioned here.