Manipulating files from S3 with Apache Spark

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is a S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).

Advertisements

Creating a multinode Apache Spark cluster on AWS from command line

The idea is to create a Spark cluster on AWS according to the needs, maximize the use of it and terminate it after the processing is done.

All the instances have Ubuntu 14.04 operating system, the instance used in the following script the free-tier t2.micro instance with 8GB storage and 1GB RAM.

To have this working from the command line, aws package has to be installed.

One instance, master in this case, is always in State “running” and this instance has the following installed:

  • aws package
  • Apache Spark

How to install Spark 2.0 is described here. There is also the link to the blog about how to install Spark History Server.

The Spark History Server can be reached on port 18080 – MASTER_PUBLIC_IP:18080 and Spark Master is on port 8080. If the pages do not load, start the services as spark user. The scripts for starting the services can be found in $SPARK_HOME/sbin.

The script for launching instances

The following script takes in one parameter – number of instances to launch and attach them to the spark cluster. These instances would become workers in the spark world.

A new script is created

vi create-spark-cluster.sh

And the following lines are written in it. Adjust accordingly (details below).

#!/bin/sh

#removes any old associations that have "worker" in it
sudo sed -i.bak '/worker/d' /etc/hosts

#removes known hosts from the file
sudo sed -i 'd' /home/ubuntu/.ssh/known_hosts

#run the loop to create as many slaves as defined in the input parameter
for i in $(seq -f %02g 1 $1)
do
  #name of the worker
  NAME="worker$i"

  #run the aws command to create an instance and run a script when the instance is created.
  #the command returns the private IP address which is used to update the local /etc/hosts file
  PRIV_IP_ADDR=$(aws ec2 run-instances --image-id ami-0d77397e --count 1 \
            --instance-type t2.micro --key-name MYKEYPAIR \
            --user-data file:///PATH_TO_SCRIPT/user-data.sh \
            --subnet-id SUBNET_ID --security-group-ids SECURITY_GROUP \
            --associate-public-ip-address \
            --output text | grep PRIVATEIPADDRESSES | awk '{print $4 "\t" $3}')

   #add the IP and hostname association to the /etc/hosts file
   echo "$PRIV_IP_ADDR" "$NAME" | sudo tee -a /etc/hosts

done

Line 19: option user-data allows you to define a file that is run on the new instance. In this file, the steps for Spark worker installation and setup are defined.
MYKEYPAIR: name of the private key you use in AWS console when launching new instances (this is not a path to the private key on the instance you are running the script from!).
SUBNET_ID: ID of the subnet you are using. It is something one creates when starting with EC2 in AWS.
SECURITY_GROUP: name of the existing security group that this instance should use.

 

The script that runs at instance launch

The following commands in the script are run as soon as each instance is created.
At the top of the script, the operating system is updated, python is installed and JDK as well.

#!/bin/sh

sudo apt-get update -y  && \
sudo apt-get install python-minimal -y && \
sudo apt-get install default-jdk -y && \
cd /etc/ && \
sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz && \
sudo tar -xvzf spark-2.0.1-bin-hadoop2.7.tgz && \
sudo rm spark-2.0.1-bin-hadoop2.7.tgz && \
sudo useradd spark && \
export SPARK_HOME=/etc/spark-2.0.1-bin-hadoop2.7 && \
sudo chown -R spark:spark $SPARK_HOME && \
sudo -u spark mkdir $SPARK_HOME/logs && \
sudo -u spark $SPARK_HOME/sbin/start-slave.sh spark://ip-10-0-0-95.eu-west-1.compute.internal:7077

Lines 6-7: Spark package is downloaded into /etc/ and unpacked.
Line 14: The slave is started and connected to the master node (Spark Master’s URL address is the script’s parameter).

Example of running the script

Thre following example creates a Spark cluster with 3 workers.

sh create-spark-cluster.sh 3

Example of the output:

10.0.0.104      ip-10-0-0-104.eu-west-1.compute.internal worker01
10.0.0.80       ip-10-0-0-80.eu-west-1.compute.internal worker02
10.0.0.47       ip-10-0-0-47.eu-west-1.compute.internal worker03

These three lines are also added to the /etc/hosts.

The Spark Master Web Interface (SPARK_MASTER_PUBLIC_IP:8080) gives more details about the Spark cluster.

spark-master

The interface displays the recently added workers with State being ALIVE. Two workers have are in state DEAD – they have been terminated from AWS, but Spark Master has not updated the Workers statistic yet.

Memory in use is 3 GB – the instances used in the cluster creation has one GB each.

Conclusion

The blog post shows a very simple way of creating a Spark cluster. The cluster creation script can be made a lot more dynamic and the script that is run on the newly created instances could be extended (installing python packaged needed for work).

Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Yarn application has already ended! It might have been killed or unable to launch application master.

If you are struggling with the error message in title of the post check if you are controlling ports that Spark needs. I have experienced that if the ports Spark is using can not be reached, YARN is going to terminate with the error message in the title. So it is best to control Spark ports and open them so that the YARN application would go through. More on Spark and networking here.

Spark chooses random ports and unless you have ALL ports open, you might run into the “endless”

INFO Client: Application report for application_1470560331181_0013 (state: ACCEPTED)

which eventually fails

INFO Client: Application report for application_1470560331181_0013 (state: FAILED)

and the error message returned would be

ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.

Adding something like this in spark-defaults.conf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

could solve this issue.

My notes on installing Spark 2.0 are here.

And how to install Spark 1.6 is described here.

Configuring Apache Spark History Server

Prior to configuring and running Spark History Server, Spark should be installed.

How to install Apache Spark 1.6.0 is described here.

How to install Apache spark 2.0 is described here.

Spark History server

Check that $SPARK_HOME/conf/spark-defaults.conf has History Server properties set

spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080

spark.history.kerberos.keytab none
spark.history.kerberos.principal none

Create spark-history directory in HDFS

sudo -u hdfs hadoop fs -mkdir /spark-history

Change the owner of the directory

sudo -u hdfs hadoop fs -chown spark:hdfs /spark-history

Change permission (be more restrictive if necessary)

sudo -u hdfs hadoop fs -chmod 777 /spark-history

Add user spark to group hdfs on the instance where Spark History Server is going to run

sudo usermod -a -G hdfs spark

To view Spark jobs from other users
When you open the History Server and you are not able to see Spark jobs you are expecting to see, check the Spark out file in the Spark log directory. If error message “Permission denied” is present, Spark History Server is trying to read the job log file, but has no permission to do so.
Spark user should be added to the group of the spark job owner.
For example, user marko belongs to a group employee. If marko starts a Spark job, the log file for this job will have user and group marko:employee. In order for spark to be able to read the log file, spark user should e added to the employee group. This is done in the following way

sudo usermod -a -G employee spark

Checking spark’s groups

groups spark

should return group employee among spark’s groups.

Start Spark History server

sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Output:

starting org.apache.spark.deploy.history.HistoryServer, logging to /var/log/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1-t-client01.out

Accessing Spark History server from the web UI can be done by accessing spark-server:18080. The following screen should load.

spark18080
A fresh Spark History Server installation has no applications to show (no applications in hdfs:/spark-history).

Spark History Server offers a great monitoring interface for Spark applications!

WARN ServletHandler: /api/v1/applications

If you happen to start Spark History Server but get neither completed nor incompleted applications on the Web UI, check the log files. If you get something like the following

WARN ServletHandler: /api/v1/applications
java.lang.NullPointerException
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
        at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
        at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
        at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
        at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
        at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
        at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.spark_project.jetty.servlets.gzip.GzipHandler.handle(GzipHandler.java:479)
        at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
        at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
        at org.spark_project.jetty.server.Server.handle(Server.java:499)
        at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:311)
        at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
        at org.spark_project.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
        at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
        at org.spark_project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
        at java.lang.Thread.run(Thread.java:745)

Take the jersey-bundle-*.jar file out of the $SPARK_HOME/jars directory. Hortonworks dont need it, you dont need it 🙂

Apache Spark 2.0.0 – Installation and configuration

I am running a HDP 2.4 multinode cluster with Ubuntu Trusty 14.04 on all my nodes. The Spark in this post is installed on my client node. My cluster has HDFS and YARN, among other services. All were installed from Ambari. This is not the case for Apache Spark 2.0, because Hortonworks does not offer Spark 2.0 on HDP 2.4.0

The documentation on the latest Spark version can be found here.

My notes on Spark 2.0 can be found here (if anyone finds them useful).

My post on setting up Apache Spark 1.6.0.

Prerequisities

Java

Update and upgrade the system and install Java

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

Add JAVA_HOME in the system variables file

sudo vi /etc/environment
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Spark user

Create user spark and add it to group hadoop

sudo adduser spark
sudo usermod -a -G hdfs spark

HDFS home directory for user spark

Create spark’s user folder in HDFS

sudo -u hdfs hadoop fs -mkdir -p /user/spark
sudo -u hdfs hadoop fs -chown -R spark:hdfs /user/spark

Spark installation and configuration

Install Spark

Create directory where spark directory is going to reside. Step into the directory

sudo mkdir /usr/apache
cd /usr/apache

Download Spark 2.0.0 from https://spark.apache.org/downloads.html

sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz

Unpack the tar file

sudo tar -xvzf spark-2.0.0-bin-hadoop2.7.tgz

Remove the tar file after it has been unpacked

sudo rm spark-2.0.0-bin-hadoop2.7.tgz

Change the ownership of the folder and its elements

sudo chown -R spark:spark spark-2.0.0-bin-hadoop2.7

Update system variables

Step into the spark 2.0.0 directory and run pwd to get full path

cd spark-2.0.0-bin-hadoop2.7
pwd

Update the system environment file by adding SPARK_HOME and adding SPARK_HOME/bin to the PATH

sudo vi /etc/environment

export SPARK_HOME=/usr/apache/spark-2.0.0-bin-hadoop2.7

At the end of PATH add

${SPARK_HOME}/bin

Refresh the system environments

source /etc/environment

Log and pid directories

Create log and pid directories

sudo mkdir /var/log/spark
sudo chown spark:spark /var/log/spark
sudo -u spark mkdir $SPARK_HOME/run

Spark configuration files

Hive configuration

Create a Hive warehouse and give permissions to the users. If Hive service is set up, the path to the Hive warehouse could be /apps/hive/warehouse

sudo -u hive hadoop fs -mkdir /user/hive/warehouse
sudo -u hdfs hadoop fs -chmod -R 777 /user/hive/warehouse

Find the hive-site.xml file – HDP versions usually have it in the following folder: /usr/hdp/current/hive-client/conf and copy it to $SPARK_HOME/conf.
The following property name in the file should be altered:
hive.metastore.warehouse.dir is replaced with spark.sql.warehouse.dir.

Spark environment file

Create a new file in under $SPARK_HOME/conf

sudo -u spark vi conf/spark-env.sh

Add the following lines and adjust aaccordingly.

export SPARK_LOG_DIR=/var/log/spark
export SPARK_PID_DIR=${SPARK_HOME}/run
export HADOOP_HOME=${HADOOP_HOME:-/usr/hdp/current/hadoop-client}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/hdp/current/hadoop-client/conf}
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SPARK_SUBMIT_OPTIONS="--jars ${SPARK_HOME}/lib/spark-csv_2.11-1.4.0.jar"

The last line serves as an example how to add external libreries to Spark. This particular package is quite common and is advised to install it. The package can be downloaded from this site.

Spark default file

Fetch HDP version

hdp-select status hadoop-client | awk '{print $3;}'

Example output for HDP 2.4:

2.4.0.0-169

Example output for HDP 2.5:

2.5.0.0-1245

Create spark-defaults.conf file in $SPARK_HOME/conf

sudo -u spark vi $SPARK_HOME/conf/spark-defaults.conf

Add the following and adjust accordingly (some properties belong to Spark History Server whose configuration is explained in the post in the link below)

spark.driver.extraJavaOptions -Dhdp.version=2.4.0.0-169
spark.yarn.am.extraJavaOptions -Dhdp.version=2.4.0.0-169
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080

spark.history.kerberos.keytab none
spark.history.kerberos.principal none

spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 384
spark.yarn.executor.memoryOverhead 384
spark.yarn.historyServer.address spark-server:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3

spark.jars.packages com.databricks:spark-csv_2.11:1.4.0

spark.io.compression.codec lzf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

The ports are defined in this configuration file. If they are not, then Spark assigns random ports. More on ports and assigning them in Spark can be found here.

If the ports are not under control, you risk the

Yarn application has already ended! It might have been killed or unable to launch application master.

error. More on that is written here.

JAVA OPTS

Create java-opts file in $SPARK_HOME/conf and add your HDP version

sudo -u spark vi $SPARK_HOME/conf/java-opts

Example:

-Dhdp.version=2.4.0.0-169

Fixing links in Ubuntu

Since the Hadoop distribution is Hortonworks and Spark is Apache’s, some workaround is in place. Remove the default link and create new ones.
First, the existing link is removed. Then the new link is created, pointing to the $SPARK_HOME/bin.

sudo rm /usr/hdp/current/spark-client
sudo ln -s /usr/apache/spark-2.0.0-bin-hadoop2.7 /usr/hdp/current/spark-client

Spark 2.0.0 is now almost ready.

Jersey problem

If you try to run a spark-submit command on YARN you can expect the following error message:

Exception in thread “main” java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Jar file jersey-bundle-*.jar is not present in the $SPARK_HOME/jars. Adding it fixes this problem:

sudo -u spark wget http://repo1.maven.org/maven2/com/sun/jersey/jersey-bundle/1.19.1/jersey-bundle-1.19.1.jar -P $SPARK_HOME/jars

January 2017 – Update on this issue:
If the following is done, Jersey 1 will be used when starting Spark History Server and the applications in Spark History Server will not be shown. The folowing error message will be generated in the Spark History Server output file:

WARN servlet.ServletHandler: /api/v1/applications
java.lang.NullPointerException
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)

This problem occurs only when one tries to run Spark on YARN, since YARN 2.7.3 uses Jersey 1 and Spark 2.0 uses Jersey 2

One workaround is not to add the Jersey 1 jar described above but disable the YARN Timeline Service in spark-defaults.conf

spark.hadoop.yarn.timeline-service.enabled false

Spark History Server

How Spark History Server is configured and brought to life is explained here. Absolutely worth setting it up, not only because it is very useful and practical for monitoring Spark applications, but also because in Spark 2.0 the graphical interface is more user friendly and, well, more graphical.

Hive SerDe error when querying from spark-sql

If you plan to use spark-sql, it is maybe worth checking this post to avoid the jsonserde not found error message.

Notes about Spark 2.0

My Apache Spark 2.0 Notes

Manipulating files in S3 with Spark is mentioned here.

Networking in Spark: Configuring ports in Spark

For Spark Context to run, some ports are used. Most of them are randomly chosen which makes it difficult to control them. This post describes how I am controlling Spark’s ports.

In my clusters, some nodes are dedicated client nodes, which means the users can access them, they can store files under their respective home directory (defining home on an attached volume is described here), and run jobs on it.

The Spark jobs can be run in different ways, from different interfaces – Command Line Interface, Zeppelin, RStudio…

 

Links to Spark installation and configuration

Installing Apache Spark 1.6.0 on a multinode cluster

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 in a cluster mode

Building Zeppelin-With-R on Spark and Zeppelin

What Spark Documentation says

Spark UI

Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is

spark.ui.port

When submitting a new Spark Context, 4040 is attempted to be used. If this port is taken, 4041 will be tried, if this one is taken, 4042 is tried and so on, until an available port is found (or maximum attempts are met).
If the attempt is unsuccessful, the log is going to display a WARN and attempt the next port. Example follows:

WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
INFO Utils: Successfully started service ‘SparkUI’ on port 4041.
INFO SparkUI: Started SparkUI at http://client-server:4041

According to the log, the Spark UI is now listening on port 4041.

Not much randomizing for this port. This is not the case for ports in the next chapter.

 

Networking

Looking at the documentation about Networking in Spark 1.6.x, this post is focusing on the 6 properties that have default value random in the following picture:

spark networking.JPG

When Spark Context is in the process of creation these receive random values.

spark.blockManager.port
spark.broadcast.port
spark.driver.port
spark.executor.port
spark.fileserver.port
spark.replClassServer.port

These are the properties that should be controlled. They can be controlled in different ways, depending on how the job is run.

 

Scenarios and solutions

If you do not care about the values assigned to these properties then no further steps are needed..

Configuring ports in spark-defaults.conf

If you are running one Spark application per node (for example: submitting python scripts by using spark-submit), you might want to define the properties in the $SPARK_HOME/conf/spark-defaults.conf. Below is an example of what should be added to the configuration file.

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

If a test is run, for example spark-submit test.py, the Spark UI is by default 4040 and the above mentioned ports are used.

Running the following command

sudo netstat -tulpn | grep 3800

Returns the following output:

tcp6      0      0      :::38000                          :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38002     :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38003     :::*      LISTEN      25300/java
tcp6      0      0      :::38004                          :::*      LISTEN      25300/java
tcp6      0      0      :::38005                          :::*      LISTEN      25300/java

 

Configuring ports directly in a script

In my case, different users would like to use different ways to run Spark applications. Here is an example of how ports are configured through a python script.

"""Pi-estimation.py"""

from random import randint
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

def sample(p):
x, y = randint(0,1), randint(0,1)
print(x)
print(y)
return 1 if x*x + y*y < 1 else 0

conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Pi")

conf.set("spark.ui.port", "4042")

conf.set("spark.blockManager.port", "38020")
conf.set("spark.broadcast.port", "38021")
conf.set("spark.driver.port", "38022")
conf.set("spark.executor.port", "38023")
conf.set("spark.fileserver.port", "38024")
conf.set("spark.replClassServer.port", "38025")

conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)

NUM_SAMPLES = randint(5000000, 100000000)
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print("NUM_SAMPLES is %i" % NUM_SAMPLES)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
(The above Pi estimation is a Spark example that comes with Spark installation)

The property values in the script run over the properties in the spark-defaults.conf file. For the runtime of this script port 4042 and ports 38020-38025 are used.

If netstat command is run again for all ports that start with 380

sudo netstat -tulpn | grep 380

The following output is shown:

tcp6           0           0           :::38000                              :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38002         :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38003         :::*          LISTEN          25300/java
tcp6           0           0           :::38004                              :::*          LISTEN          25300/java
tcp6           0           0           :::38005                              :::*          LISTEN          25300/java
tcp6           0           0           :::38020                              :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38022         :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38023         :::*          LISTEN          27280/java
tcp6           0           0           :::38024                              :::*          LISTEN          27280/java

2 processes are running one separate Spark application each on ports that were defined beforehand.

 

Configuring ports in Zeppelin

Since my users use Apache Zeppelin, similar network management had to be done there. Zeppelin is also sending jobs to Spark Context through spark-submit command. That means that the properties can be configured in the same way. This time through an interpreter in Zeppelin:

Choosing menu Interpreter and choosing spark interpreter will get you there. Now it is all about adding new properties and respective values. Do not forget to click on the plus when you are ready to add a new property.
At the very end, save everything and restart the spark interpreter.

Below is an example of how this is done:

spark zeppelin ports

Next time a Spark context is created in Zeppelin, the ports will be taken into account.

 

Conclusion

This can be useful if multiple users are running Spark applications on one machine and have separate Spark Contexts.

In case of Zeppelin, this comes in handy when one Zeppelin instance is deployed per user.