Automating access from Apache Spark to S3 with Ansible

According to the Apache Spark documentation, Spark jobs must authenticate with S3 to be able to read or write data in the object storage. There are different ways of achieving that:

  • When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
  • spark-submit reads the AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_SESSION_TOKEN environment variables and sets the associated authentication options for the s3n and s3a connectors to Amazon S3.
  • In a Hadoop cluster, settings may be set in the core-site.xml file.
  • Authentication details may be manually added to the Spark configuration in spark-defaults.conf.
  • Alternatively, they can be programmatically set in the SparkConf instance used to configure the application’s SparkContext.

Honestly, I wouldn’t know much about the first option. It might have something to do with running Databricks on AWS.

The second option requires to set environment variables on all servers of the Spark cluster. If using Ansible, this can be done but only on a level of a task or role. This means that if you run a long-live Spark cluster, the variables will not be available once you start using the cluster.

The fourth option is the one that will receive the attention in this post. The spark-defaults.conf is the default configuration file and proper configuration in the file tunes your Spark cluster.

There are five configuration tuples needed to manipulate S3 data with Apache Spark. They are explained below.

Getting environmental variables into Docker

The following approach is suitable for a proof of concept or a testing. An enterprise solution should use service like Hashicorp Vault, Ansible Vault, AWS IAM or similar.

I am using Docker on Windows 10. The folder where DockerFile resides also has a file called aws_cred.env. Make sure this file is added to the .gitignore file so that it is not checked into source code repository! The env file holds the AWS key and secret key needed to authenticate with S3. The file structure is like this:

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

When running the docker container with option –env-file the environmental variables in the file get exported to the Docker container.

In the Ansible code, they can both be looked-up in the following way:

{{ lookup('env', 'AWS_ACCESS_KEY_ID') }}
{{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}

These can be used in the Jinja2 template file spark-defaults.conf.j2 to generate a Spark configuration file. The configuration tuples relevant in this case are these two:

spark.hadoop.fs.s3a.access.key {{ lookup('env', 'AWS_ACCESS_KEY_ID') }}
spark.hadoop.fs.s3a.secret.key {{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}

This now gives you the access to the S3 buckets, never mind if they are public or private.

The JAR files

First, the following tuple is mandatory for the Spark configuration:

spark.hadoop.fs.s3a.impl      org.apache.hadoop.fs.s3a.S3AFileSystem

This tells Spark what kind of file system it is dealing with. The JAR files are the library sources for this configuration.

Two libraries must be added to the instances of the Spark cluster:

  • aws-java-sdk-1.7.4
  • hadoop-aws-2.7.3

The above mentioned Jinja2 file also holds two configuration tuples relevant for these JAR files:

spark.driver.extraClassPath   /usr/spark-s3-jars/aws-java-sdk-1.7.4.jar:/usr/spark-s3-jars/hadoop-aws-2.7.3.jar
spark.executor.extraClassPath /usr/spark-s3-jars/aws-java-sdk-1.7.4.jar:/usr/spark-s3-jars/hadoop-aws-2.7.3.jar

Be careful with the versions because they must match the Spark version. The above combination has proven to work on Spark installation packages that support Hadoop 2.7. Last two tasks in this main.yml do the job for the Spark cluster.

Once the files are downloaded (for example, I download them to /usr/spark-s3-jars) Apache Spark can start reading and writing to the S3 object storage.

Provision Apache Spark in AWS with Hashistack and Ansible

Provision Apache Spark in AWS with Hashistack and Ansible

Automation is the key word when it comes to using cloud services. Pay-as-you-go is the philosophy behind it.

In this post, I explain how I provision Apache Spark cluster on Amazon. The configuration of the cluster is done prior to the provisioning using the Jinja2 file templates. The cluster, once provisioning is completed, is therefor ready to use immediately.

One of the points with automation is to make data scientists more independant of data engineers: data engineer builds the solution and data scientist uses it without having the need for engineering experience.
In this case, the data scientist hast to configure the cluster using the YAML file and prepare a GitHub repository.

There are two ways of using this solution:

  • A long-live Spark cluster
    Spark cluster serves as a solution for running various jobs. The cluster is always available.
  • One-time job execution
    Spark cluster is provisioned for a specific job which is executed and then the cluster is destroyed. Data Scientist is responsible for data input and data storage in the code (example).

Technologies and services

  • AWS EC2 (Centos7)
  • Terraform for provisioning the infrastructure in AWS
  • Consul for cluster’s configuration settings
  • Ansible for software installation on the cluster
  • GitHub for version control
  • Docker as test and development environment
  • Powershell for running Docker and provision
  • Visual Studio Code for software development, running Powershell

In order to use a service like EC2 in AWS, the Virtual Private Cloud must be established. This is something I have automized using Terraform and Consul and described here. This provision is a “long-live” provision since VPC has practically no cost.

Prerequisites

I will not go into details of how to install all the technologies and services from the list. However, this GitHub repository does build a Docker container with Consul and latest Terraform. Consul in the Docker is an agent which connects to a global Consul server in Amazon. Documenting the global Consul is on my TO-DO list.

I suggest investing some time and creating a Consul with connection to your own GitHub repository that stores the configuration.

Repository on GitHub

The repository can be found at this address.

Repository Structure

There are two modules used in this project: instance and provision-spark. The module instance is pure Terraform code and does the provisioning of the instances (Spark’s master and workers) in the AWS. The output (DNS and IP addresses) of this module is the input for the module provision-spark which is more complex. It is written in Terraform, Ansible and Jinja2.

Ansible Roles

Below is the structure of the Ansible part of the module.

Roles prereq and spark are applied to all instances. The prereq role takes care of the prerequisites (java, anaconda) and the spark role downloads and installs Spark, and creates Linux objects needed for Spark to work. The start_spark_master applies only to the master instance and start_spark_workers to the worker instances. The role execute_on_spark automatically executes a job on Spark cluster (more on that later).

The path to the YAML file that executes the roles is available here.

Cluster Configuration

Cluster is configured in YAML format and the configuration is sent to the global Consul server. One configuration block servers one cluster. Example for cluster lr_iris can be found here.

Running the code

Provisioning starts in module provision-spark where the line

terraform apply -auto-approve

starts provisioning the cluster. Configuration is taken from Consul to populate the variables in Terraform. Ansible (inventory) file is created by Terraform after the EC2 instances are launched and started. After the inventory file is created, Terraform executes the spark.yml file and the rest is in the hands of Ansible. If everything goes well, the output is similar to the following:

This is a Terraform output as defined in the output.tf file.

The Spark cluster is now ready.

View in AWS Console

The instances in the Spark cluster look like this in AWS console:

Spark as a Service

Spark services running on master and workers are handled as services using systemctl. The services are created and started using Ansible: Spark workers start a service called spark-worker whose Ansible code can be seen here.
Spark Master has two services: spark-master and sparkhs (Spark History Server). Ansible code for both services is here.

Spark Master

Checking if Spark Master is available by using the public IP address and port 8080 should return an interface similar to this one:

Five workers were set up in the configuration file. This means we have a cluster with six instances: one is the Spark Master, the other five are the workers.

Spark History Server

Spark History Server, just like Spark Master become significant if long-live cluster is used. It helps monitoring and debugging the jobs (applications in Spark language).
Spark History Server can be reached at port 18080 on Spark Master.

Above is an example of an application that was executed on the Spark cluster. Note Event log directory – it is pointing to a local directory which will be removed once the cluster is destroyed. This is not an issue if we are running a long-live cluster, but if we want to keep logs for one-time clusters it is advised to store the logs externally. In this case, since Amazon is used, storing to S3 would be the best option.

Automatic Code Execution

The Spark cluster is now ready to use. Full automation process is achieved when the Spark code is automatically executed from the Terraform code once the cluster is available. In the repository, one of the Ansible roles is execute_on_spark which executes either a Python or a Scala code on the provisioned Spark cluster.

Which Spark code will be executed depends on the configuration in the YAML file. A path to a GitHub repository is part of the configuration and that repository is cloned to the Spark Master and executed.

An example mentioned above can be found here. The example is one of Hello Worlds in data science – Logistic Regression on Iris dataset. In this case, the Data Scientist is responsible for the input data and storing the results outside of the Spark cluster.

    input_file = "s3a://hdp-hive-s3/test/iris.csv"
    output_dir = "s3a://hdp-hive-s3/test/git_iris_out"

When the cluster is ready, the repository is cloned and the code is executed. Inside the code, the Data Scientist defines input and output. In this case, object storage S3 is used to do a one-time job, save the results and the Spark cluster is of no use anymore.

Installing Apache Spark 2.2.1

I have installed older Apache Spark versions and now the time is right to install Spark 2.2.1.

Update 27.09.2019: Automated Apache Spark install with Docker, Terraform and Ansible? Check out this post.

Im using an AWS t2.micro instance with Ubuntu 16.04 on it. MobaXterm is my choice of interface to SSH to the instance.

System update

sudo apt-get update -y
sudo apt-get upgrade -y

Change instance name

Go into the hostname file and change the name to spark

sudo vi /etc/hostname

Change localhost with instance name in hosts file

After the 127.0.0.1 write the name of the instance

sudo vi /etc/hosts

Reboot the instance

sudo reboot

Install and set up Java

If not sooner you will need Java for running History server. Java 8 is installed in the following way

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

Add JAVA_HOME to the environment file

sudo vi /etc/environment

And add the following line to the top of the file

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Doublecheck if this is the correct Java home.

Python

Spark 2.2.x supports Python 2.7+/3.4+. When running PySpark, Spark looks for Python in /usr/bin directory. Ubuntu 16.04 on AWS comes only with Python 3.5. When running PySpark without Python 2.7+, the following error message outputs

/usr/apache/spark-2.2.1-bin-hadoop2.7/bin/pyspark: line 45: python: command not found
env: ‘python’: No such file or directory

Options are two: install Python 2.7+ or create a link to Python3. The first alternative is acceptable only if Python packages that are in Python2 but not Python3 are going to be used. Otherwise, the latter alternative is the option. And this is done in the following way

sudo ln -s /usr/bin/python3 /usr/bin/python

Running PySpark now starts PySpark CLI with Python 3.5.2

And yes, running python or python3 will both execute the same action – start python 3.5.

Create user spark

sudo adduser spark

Define password, for the sake of testing, let’s go with spark

Prepare directory for Spark home

sudo mkdir /usr/apache

Step into the directory

cd /usr/apache

Download and unpack Apache Spark 2.2.1

sudo wget http://apache.uib.no/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
sudo tar -xvzf spark-2.2.1-bin-hadoop2.7.tgz

Clean up – delete the spark’s tgz file

sudo rm spark-2.2.1-bin-hadoop2.7.tgz

Change the owner of the spark directory

sudo chown spark:spark /usr/apache/spark-2.2.1-bin-hadoop2.7

Create Spark home

cd spark-2.2.1-bin-hadoop2.7
pwd

Output of the pwd command is the value for SPARK_HOME in the environment file. Open the file

sudo vi /etc/environment

And add the below SPARK_HOME line before the PATH line

export SPARK_HOME=/usr/apache/spark-2.2.1-bin-hadoop2.7

At the end of PATH add

:${SPARK_HOME}/bin

(You need the colon to separate from previous values)
Refresh the environment file

source /etc/environment

Create log and pid directories

sudo mkdir -p /var/log/spark/logs
sudo chown spark:spark -R /var/log/spark
sudo -u spark mkdir $SPARK_HOME/run
sudo -u spark chmod 777 /var/log/spark/logs

The last line allows every user to read, write to the directory. It is probably best to adjust access according to the needs.

If different users are going to run Spark applications and if those applications would want to be seen in History Server, user spark has to be added to the users’ group.
For example, user ubuntu is running Spark applications, that means user ubuntu writes log files to Spark History log directory (check below for the property spark.history.fs.logDirectory) and Spark is opening them through History Server. To be albe to do the latter, spark has to be a memeber of ubuntu group. This is done in the following way

sudo usermod -a -G ubuntu spark

Prepare spark-env.sh file

Open the file

sudo -u spark vi $SPARK_HOME/conf/spark-env.sh

Add the following values

SPARK_LOG_DIR=/var/log/spark
SPARK_PID_DIR=${SPARK_HOME}/run

Prepare spark-default.sh file

Open the file

sudo -u spark vi $SPARK_HOME/conf/spark-defaults.conf

Add the following values

spark.history.fs.logDirectory file:/var/log/spark/logs
spark.eventLog.enabled true
spark.eventLog.dir file:/var/log/spark/logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

Start Spark History

sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Instances IP address on port 18080 should open the Spark History Server. If not, check the /var/log/spark for errors and messages.

Manipulating files from S3 with Apache Spark

Update 22/5/2019: Here is a post about how to use Spark, Scala, S3 and sbt in Intellij IDEA to create a JAR application that reads from S3.

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is an S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).

Creating a multinode Apache Spark cluster on AWS from command line

The idea is to create a Spark cluster on AWS according to the needs, maximize the use of it and terminate it after the processing is done.

All the instances have Ubuntu 14.04 operating system, the instance used in the following script the free-tier t2.micro instance with 8GB storage and 1GB RAM.

To have this working from the command line, aws package has to be installed.

One instance, master in this case, is always in State “running” and this instance has the following installed:

  • aws package
  • Apache Spark

How to install Spark 2.0 is described here. There is also the link to the blog about how to install Spark History Server.

The Spark History Server can be reached on port 18080 – MASTER_PUBLIC_IP:18080 and Spark Master is on port 8080. If the pages do not load, start the services as spark user. The scripts for starting the services can be found in $SPARK_HOME/sbin.

The script for launching instances

The following script takes in one parameter – number of instances to launch and attach them to the spark cluster. These instances would become workers in the spark world.

A new script is created

vi create-spark-cluster.sh

And the following lines are written in it. Adjust accordingly (details below).

#!/bin/sh

#removes any old associations that have "worker" in it
sudo sed -i.bak '/worker/d' /etc/hosts

#removes known hosts from the file
sudo sed -i 'd' /home/ubuntu/.ssh/known_hosts

#run the loop to create as many slaves as defined in the input parameter
for i in $(seq -f %02g 1 $1)
do
  #name of the worker
  NAME="worker$i"

  #run the aws command to create an instance and run a script when the instance is created.
  #the command returns the private IP address which is used to update the local /etc/hosts file
  PRIV_IP_ADDR=$(aws ec2 run-instances --image-id ami-0d77397e --count 1 \
            --instance-type t2.micro --key-name MYKEYPAIR \
            --user-data file:///PATH_TO_SCRIPT/user-data.sh \
            --subnet-id SUBNET_ID --security-group-ids SECURITY_GROUP \
            --associate-public-ip-address \
            --output text | grep PRIVATEIPADDRESSES | awk '{print $4 "\t" $3}')

   #add the IP and hostname association to the /etc/hosts file
   echo "$PRIV_IP_ADDR" "$NAME" | sudo tee -a /etc/hosts

done

Line 19: option user-data allows you to define a file that is run on the new instance. In this file, the steps for Spark worker installation and setup are defined.
MYKEYPAIR: name of the private key you use in AWS console when launching new instances (this is not a path to the private key on the instance you are running the script from!).
SUBNET_ID: ID of the subnet you are using. It is something one creates when starting with EC2 in AWS.
SECURITY_GROUP: name of the existing security group that this instance should use.

 

The script that runs at instance launch

The following commands in the script are run as soon as each instance is created.
At the top of the script, the operating system is updated, python is installed and JDK as well.

#!/bin/sh

sudo apt-get update -y  && \
sudo apt-get install python-minimal -y && \
sudo apt-get install default-jdk -y && \
cd /etc/ && \
sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz && \
sudo tar -xvzf spark-2.0.1-bin-hadoop2.7.tgz && \
sudo rm spark-2.0.1-bin-hadoop2.7.tgz && \
sudo useradd spark && \
export SPARK_HOME=/etc/spark-2.0.1-bin-hadoop2.7 && \
sudo chown -R spark:spark $SPARK_HOME && \
sudo -u spark mkdir $SPARK_HOME/logs && \
sudo -u spark $SPARK_HOME/sbin/start-slave.sh spark://ip-10-0-0-95.eu-west-1.compute.internal:7077

Lines 6-7: Spark package is downloaded into /etc/ and unpacked.
Line 14: The slave is started and connected to the master node (Spark Master’s URL address is the script’s parameter).

Example of running the script

Thre following example creates a Spark cluster with 3 workers.

sh create-spark-cluster.sh 3

Example of the output:

10.0.0.104      ip-10-0-0-104.eu-west-1.compute.internal worker01
10.0.0.80       ip-10-0-0-80.eu-west-1.compute.internal worker02
10.0.0.47       ip-10-0-0-47.eu-west-1.compute.internal worker03

These three lines are also added to the /etc/hosts.

The Spark Master Web Interface (SPARK_MASTER_PUBLIC_IP:8080) gives more details about the Spark cluster.

spark-master

The interface displays the recently added workers with State being ALIVE. Two workers have are in state DEAD – they have been terminated from AWS, but Spark Master has not updated the Workers statistic yet.

Memory in use is 3 GB – the instances used in the cluster creation has one GB each.

Conclusion

The blog post shows a very simple way of creating a Spark cluster. The cluster creation script can be made a lot more dynamic and the script that is run on the newly created instances could be extended (installing python packaged needed for work).

Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Yarn application has already ended! It might have been killed or unable to launch application master.

If you are struggling with the error message in title of the post check if you are controlling ports that Spark needs. I have experienced that if the ports Spark is using can not be reached, YARN is going to terminate with the error message in the title. So it is best to control Spark ports and open them so that the YARN application would go through. More on Spark and networking here.

Spark chooses random ports and unless you have ALL ports open, you might run into the “endless”

INFO Client: Application report for application_1470560331181_0013 (state: ACCEPTED)

which eventually fails

INFO Client: Application report for application_1470560331181_0013 (state: FAILED)

and the error message returned would be

ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.

Adding something like this in spark-defaults.conf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

could solve this issue.

My notes on installing Spark 2.0 are here.

And how to install Spark 1.6 is described here.

Configuring Apache Spark History Server

Prior to configuring and running Spark History Server, Spark should be installed.

How to install Apache Spark 1.6.0 is described here.

How to install Apache spark 2.0 is described here.

Spark History server

Check that $SPARK_HOME/conf/spark-defaults.conf has History Server properties set

spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080

spark.history.kerberos.keytab none
spark.history.kerberos.principal none

Create spark-history directory in HDFS

sudo -u hdfs hadoop fs -mkdir /spark-history

Change the owner of the directory

sudo -u hdfs hadoop fs -chown spark:hdfs /spark-history

Change permission (be more restrictive if necessary)

sudo -u hdfs hadoop fs -chmod 777 /spark-history

Add user spark to group hdfs on the instance where Spark History Server is going to run

sudo usermod -a -G hdfs spark

To view Spark jobs from other users
When you open the History Server and you are not able to see Spark jobs you are expecting to see, check the Spark out file in the Spark log directory. If error message “Permission denied” is present, Spark History Server is trying to read the job log file, but has no permission to do so.
Spark user should be added to the group of the spark job owner.
For example, user marko belongs to a group employee. If marko starts a Spark job, the log file for this job will have user and group marko:employee. In order for spark to be able to read the log file, spark user should e added to the employee group. This is done in the following way

sudo usermod -a -G employee spark

Checking spark’s groups

groups spark

should return group employee among spark’s groups.

Start Spark History server

sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Output:

starting org.apache.spark.deploy.history.HistoryServer, logging to /var/log/spark/spark-spark-org.apache.spark.deploy.history.HistoryServer-1-t-client01.out

Accessing Spark History server from the web UI can be done by accessing spark-server:18080. The following screen should load.

spark18080
A fresh Spark History Server installation has no applications to show (no applications in hdfs:/spark-history).

Spark History Server offers a great monitoring interface for Spark applications!

WARN ServletHandler: /api/v1/applications

If you happen to start Spark History Server but get neither completed nor incompleted applications on the Web UI, check the log files. If you get something like the following

WARN ServletHandler: /api/v1/applications
java.lang.NullPointerException
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
        at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
        at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
        at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
        at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
        at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
        at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.spark_project.jetty.servlets.gzip.GzipHandler.handle(GzipHandler.java:479)
        at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:215)
        at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
        at org.spark_project.jetty.server.Server.handle(Server.java:499)
        at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:311)
        at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
        at org.spark_project.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
        at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
        at org.spark_project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
        at java.lang.Thread.run(Thread.java:745)

Take the jersey-bundle-*.jar file out of the $SPARK_HOME/jars directory. Hortonworks dont need it, you dont need it 🙂

Apache Spark 2.0.0 – Installation and configuration

I am running a HDP 2.4 multinode cluster with Ubuntu Trusty 14.04 on all my nodes. The Spark in this post is installed on my client node. My cluster has HDFS and YARN, among other services. All were installed from Ambari. This is not the case for Apache Spark 2.0, because Hortonworks does not offer Spark 2.0 on HDP 2.4.0

The documentation on the latest Spark version can be found here.

My notes on Spark 2.0 can be found here (if anyone finds them useful).

My post on setting up Apache Spark 1.6.0.

Update 12.January 2018: A post on how to install Apache Spark 2.2.1 Apache Spark 2.2.1 on Ubuntu 16.04 – Hadoop-less instance.

Prerequisities

Java

Update and upgrade the system and install Java

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

Add JAVA_HOME in the system variables file

sudo vi /etc/environment
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Spark user

Create user spark and add it to group hadoop

sudo adduser spark
sudo usermod -a -G hdfs spark

HDFS home directory for user spark

Create spark’s user folder in HDFS

sudo -u hdfs hadoop fs -mkdir -p /user/spark
sudo -u hdfs hadoop fs -chown -R spark:hdfs /user/spark

Spark installation and configuration

Install Spark

Create directory where spark directory is going to reside. Step into the directory

sudo mkdir /usr/apache
cd /usr/apache

Download Spark 2.0.0 from https://spark.apache.org/downloads.html

sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz

Unpack the tar file

sudo tar -xvzf spark-2.0.0-bin-hadoop2.7.tgz

Remove the tar file after it has been unpacked

sudo rm spark-2.0.0-bin-hadoop2.7.tgz

Change the ownership of the folder and its elements

sudo chown -R spark:spark spark-2.0.0-bin-hadoop2.7

Update system variables

Step into the spark 2.0.0 directory and run pwd to get full path

cd spark-2.0.0-bin-hadoop2.7
pwd

Update the system environment file by adding SPARK_HOME and adding SPARK_HOME/bin to the PATH

sudo vi /etc/environment

export SPARK_HOME=/usr/apache/spark-2.0.0-bin-hadoop2.7

At the end of PATH add

${SPARK_HOME}/bin

Refresh the system environments

source /etc/environment

Log and pid directories

Create log and pid directories

sudo mkdir /var/log/spark
sudo chown spark:spark /var/log/spark
sudo -u spark mkdir $SPARK_HOME/run

Spark configuration files

Hive configuration

Create a Hive warehouse and give permissions to the users. If Hive service is set up, the path to the Hive warehouse could be /apps/hive/warehouse

sudo -u hive hadoop fs -mkdir /user/hive/warehouse
sudo -u hdfs hadoop fs -chmod -R 777 /user/hive/warehouse

Find the hive-site.xml file – HDP versions usually have it in the following folder: /usr/hdp/current/hive-client/conf and copy it to $SPARK_HOME/conf.
The following property name in the file should be altered:
hive.metastore.warehouse.dir is replaced with spark.sql.warehouse.dir.

Spark environment file

Create a new file in under $SPARK_HOME/conf

sudo -u spark vi conf/spark-env.sh

Add the following lines and adjust aaccordingly.

export SPARK_LOG_DIR=/var/log/spark
export SPARK_PID_DIR=${SPARK_HOME}/run
export HADOOP_HOME=${HADOOP_HOME:-/usr/hdp/current/hadoop-client}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/hdp/current/hadoop-client/conf}
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SPARK_SUBMIT_OPTIONS="--jars ${SPARK_HOME}/lib/spark-csv_2.11-1.4.0.jar"

The last line serves as an example how to add external libreries to Spark. This particular package is quite common and is advised to install it. The package can be downloaded from this site.

Spark default file

Fetch HDP version

hdp-select status hadoop-client | awk '{print $3;}'

Example output for HDP 2.4:

2.4.0.0-169

Example output for HDP 2.5:

2.5.0.0-1245

Create spark-defaults.conf file in $SPARK_HOME/conf

sudo -u spark vi $SPARK_HOME/conf/spark-defaults.conf

Add the following and adjust accordingly (some properties belong to Spark History Server whose configuration is explained in the post in the link below)

spark.driver.extraJavaOptions -Dhdp.version=2.4.0.0-169
spark.yarn.am.extraJavaOptions -Dhdp.version=2.4.0.0-169
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080

spark.history.kerberos.keytab none
spark.history.kerberos.principal none

spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 384
spark.yarn.executor.memoryOverhead 384
spark.yarn.historyServer.address spark-server:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3

spark.jars.packages com.databricks:spark-csv_2.11:1.4.0

spark.io.compression.codec lzf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

The ports are defined in this configuration file. If they are not, then Spark assigns random ports. More on ports and assigning them in Spark can be found here.

If the ports are not under control, you risk the

Yarn application has already ended! It might have been killed or unable to launch application master.

error. More on that is written here.

JAVA OPTS

Create java-opts file in $SPARK_HOME/conf and add your HDP version

sudo -u spark vi $SPARK_HOME/conf/java-opts

Example:

-Dhdp.version=2.4.0.0-169

Fixing links in Ubuntu

Since the Hadoop distribution is Hortonworks and Spark is Apache’s, some workaround is in place. Remove the default link and create new ones.
First, the existing link is removed. Then the new link is created, pointing to the $SPARK_HOME/bin.

sudo rm /usr/hdp/current/spark-client
sudo ln -s /usr/apache/spark-2.0.0-bin-hadoop2.7 /usr/hdp/current/spark-client

Spark 2.0.0 is now almost ready.

Jersey problem

If you try to run a spark-submit command on YARN you can expect the following error message:

Exception in thread “main” java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Jar file jersey-bundle-*.jar is not present in the $SPARK_HOME/jars. Adding it fixes this problem:

sudo -u spark wget http://repo1.maven.org/maven2/com/sun/jersey/jersey-bundle/1.19.1/jersey-bundle-1.19.1.jar -P $SPARK_HOME/jars

January 2017 – Update on this issue:
If the following is done, Jersey 1 will be used when starting Spark History Server and the applications in Spark History Server will not be shown. The folowing error message will be generated in the Spark History Server output file:

WARN servlet.ServletHandler: /api/v1/applications
java.lang.NullPointerException
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)

This problem occurs only when one tries to run Spark on YARN, since YARN 2.7.3 uses Jersey 1 and Spark 2.0 uses Jersey 2

One workaround is not to add the Jersey 1 jar described above but disable the YARN Timeline Service in spark-defaults.conf

spark.hadoop.yarn.timeline-service.enabled false

Spark History Server

How Spark History Server is configured and brought to life is explained here. Absolutely worth setting it up, not only because it is very useful and practical for monitoring Spark applications, but also because in Spark 2.0 the graphical interface is more user friendly and, well, more graphical.

Hive SerDe error when querying from spark-sql

If you plan to use spark-sql, it is maybe worth checking this post to avoid the jsonserde not found error message.

Notes about Spark 2.0

My Apache Spark 2.0 Notes

Manipulating files in S3 with Spark is mentioned here.

Networking in Spark: Configuring ports in Spark

For Spark Context to run, some ports are used. Most of them are randomly chosen which makes it difficult to control them. This post describes how I am controlling Spark’s ports.

In my clusters, some nodes are dedicated client nodes, which means the users can access them, they can store files under their respective home directory (defining home on an attached volume is described here), and run jobs on it.

The Spark jobs can be run in different ways, from different interfaces – Command Line Interface, Zeppelin, RStudio…

 

Links to Spark installation and configuration

Installing Apache Spark 1.6.0 on a multinode cluster

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 in a cluster mode

Building Zeppelin-With-R on Spark and Zeppelin

What Spark Documentation says

Spark UI

Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is

spark.ui.port

When submitting a new Spark Context, 4040 is attempted to be used. If this port is taken, 4041 will be tried, if this one is taken, 4042 is tried and so on, until an available port is found (or maximum attempts are met).
If the attempt is unsuccessful, the log is going to display a WARN and attempt the next port. Example follows:

WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
INFO Utils: Successfully started service ‘SparkUI’ on port 4041.
INFO SparkUI: Started SparkUI at http://client-server:4041

According to the log, the Spark UI is now listening on port 4041.

Not much randomizing for this port. This is not the case for ports in the next chapter.

 

Networking

Looking at the documentation about Networking in Spark 1.6.x, this post is focusing on the 6 properties that have default value random in the following picture:

spark networking.JPG

When Spark Context is in the process of creation these receive random values.

spark.blockManager.port
spark.broadcast.port
spark.driver.port
spark.executor.port
spark.fileserver.port
spark.replClassServer.port

These are the properties that should be controlled. They can be controlled in different ways, depending on how the job is run.

 

Scenarios and solutions

If you do not care about the values assigned to these properties then no further steps are needed..

Configuring ports in spark-defaults.conf

If you are running one Spark application per node (for example: submitting python scripts by using spark-submit), you might want to define the properties in the $SPARK_HOME/conf/spark-defaults.conf. Below is an example of what should be added to the configuration file.

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

If a test is run, for example spark-submit test.py, the Spark UI is by default 4040 and the above mentioned ports are used.

Running the following command

sudo netstat -tulpn | grep 3800

Returns the following output:

tcp6      0      0      :::38000                          :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38002     :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38003     :::*      LISTEN      25300/java
tcp6      0      0      :::38004                          :::*      LISTEN      25300/java
tcp6      0      0      :::38005                          :::*      LISTEN      25300/java

 

Configuring ports directly in a script

In my case, different users would like to use different ways to run Spark applications. Here is an example of how ports are configured through a python script.

"""Pi-estimation.py"""

from random import randint
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

def sample(p):
x, y = randint(0,1), randint(0,1)
print(x)
print(y)
return 1 if x*x + y*y < 1 else 0

conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Pi")

conf.set("spark.ui.port", "4042")

conf.set("spark.blockManager.port", "38020")
conf.set("spark.broadcast.port", "38021")
conf.set("spark.driver.port", "38022")
conf.set("spark.executor.port", "38023")
conf.set("spark.fileserver.port", "38024")
conf.set("spark.replClassServer.port", "38025")

conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)

NUM_SAMPLES = randint(5000000, 100000000)
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print("NUM_SAMPLES is %i" % NUM_SAMPLES)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
(The above Pi estimation is a Spark example that comes with Spark installation)

The property values in the script run over the properties in the spark-defaults.conf file. For the runtime of this script port 4042 and ports 38020-38025 are used.

If netstat command is run again for all ports that start with 380

sudo netstat -tulpn | grep 380

The following output is shown:

tcp6           0           0           :::38000                              :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38002         :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38003         :::*          LISTEN          25300/java
tcp6           0           0           :::38004                              :::*          LISTEN          25300/java
tcp6           0           0           :::38005                              :::*          LISTEN          25300/java
tcp6           0           0           :::38020                              :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38022         :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38023         :::*          LISTEN          27280/java
tcp6           0           0           :::38024                              :::*          LISTEN          27280/java

2 processes are running one separate Spark application each on ports that were defined beforehand.

 

Configuring ports in Zeppelin

Since my users use Apache Zeppelin, similar network management had to be done there. Zeppelin is also sending jobs to Spark Context through spark-submit command. That means that the properties can be configured in the same way. This time through an interpreter in Zeppelin:

Choosing menu Interpreter and choosing spark interpreter will get you there. Now it is all about adding new properties and respective values. Do not forget to click on the plus when you are ready to add a new property.
At the very end, save everything and restart the spark interpreter.

Below is an example of how this is done:

spark zeppelin ports

Next time a Spark context is created in Zeppelin, the ports will be taken into account.

 

Conclusion

This can be useful if multiple users are running Spark applications on one machine and have separate Spark Contexts.

In case of Zeppelin, this comes in handy when one Zeppelin instance is deployed per user.