Automating access from Apache Spark to S3 with Ansible

According to the Apache Spark documentation, Spark jobs must authenticate with S3 to be able to read or write data in the object storage. There are different ways of achieving that:

  • When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
  • spark-submit reads the AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_SESSION_TOKEN environment variables and sets the associated authentication options for the s3n and s3a connectors to Amazon S3.
  • In a Hadoop cluster, settings may be set in the core-site.xml file.
  • Authentication details may be manually added to the Spark configuration in spark-defaults.conf.
  • Alternatively, they can be programmatically set in the SparkConf instance used to configure the application’s SparkContext.

Honestly, I wouldn’t know much about the first option. It might have something to do with running Databricks on AWS.

The second option requires to set environment variables on all servers of the Spark cluster. If using Ansible, this can be done but only on a level of a task or role. This means that if you run a long-live Spark cluster, the variables will not be available once you start using the cluster.

The fourth option is the one that will receive the attention in this post. The spark-defaults.conf is the default configuration file and proper configuration in the file tunes your Spark cluster.

There are five configuration tuples needed to manipulate S3 data with Apache Spark. They are explained below.

Getting environmental variables into Docker

The following approach is suitable for a proof of concept or a testing. An enterprise solution should use service like Hashicorp Vault, Ansible Vault, AWS IAM or similar.

I am using Docker on Windows 10. The folder where DockerFile resides also has a file called aws_cred.env. Make sure this file is added to the .gitignore file so that it is not checked into source code repository! The env file holds the AWS key and secret key needed to authenticate with S3. The file structure is like this:

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

When running the docker container with option –env-file the environmental variables in the file get exported to the Docker container.

In the Ansible code, they can both be looked-up in the following way:

{{ lookup('env', 'AWS_ACCESS_KEY_ID') }}
{{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}

These can be used in the Jinja2 template file spark-defaults.conf.j2 to generate a Spark configuration file. The configuration tuples relevant in this case are these two:

spark.hadoop.fs.s3a.access.key {{ lookup('env', 'AWS_ACCESS_KEY_ID') }}
spark.hadoop.fs.s3a.secret.key {{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}

This now gives you the access to the S3 buckets, never mind if they are public or private.

The JAR files

First, the following tuple is mandatory for the Spark configuration:

spark.hadoop.fs.s3a.impl      org.apache.hadoop.fs.s3a.S3AFileSystem

This tells Spark what kind of file system it is dealing with. The JAR files are the library sources for this configuration.

Two libraries must be added to the instances of the Spark cluster:

  • aws-java-sdk-1.7.4
  • hadoop-aws-2.7.3

The above mentioned Jinja2 file also holds two configuration tuples relevant for these JAR files:

spark.driver.extraClassPath   /usr/spark-s3-jars/aws-java-sdk-1.7.4.jar:/usr/spark-s3-jars/hadoop-aws-2.7.3.jar
spark.executor.extraClassPath /usr/spark-s3-jars/aws-java-sdk-1.7.4.jar:/usr/spark-s3-jars/hadoop-aws-2.7.3.jar

Be careful with the versions because they must match the Spark version. The above combination has proven to work on Spark installation packages that support Hadoop 2.7. Last two tasks in this main.yml do the job for the Spark cluster.

Once the files are downloaded (for example, I download them to /usr/spark-s3-jars) Apache Spark can start reading and writing to the S3 object storage.

Provision Apache Spark in AWS with Hashistack and Ansible

Provision Apache Spark in AWS with Hashistack and Ansible

Automation is the key word when it comes to using cloud services. Pay-as-you-go is the philosophy behind it.

In this post, I explain how I provision Apache Spark cluster on Amazon. The configuration of the cluster is done prior to the provisioning using the Jinja2 file templates. The cluster, once provisioning is completed, is therefor ready to use immediately.

One of the points with automation is to make data scientists more independant of data engineers: data engineer builds the solution and data scientist uses it without having the need for engineering experience.
In this case, the data scientist hast to configure the cluster using the YAML file and prepare a GitHub repository.

There are two ways of using this solution:

  • A long-live Spark cluster
    Spark cluster serves as a solution for running various jobs. The cluster is always available.
  • One-time job execution
    Spark cluster is provisioned for a specific job which is executed and then the cluster is destroyed. Data Scientist is responsible for data input and data storage in the code (example).

Technologies and services

  • AWS EC2 (Centos7)
  • Terraform for provisioning the infrastructure in AWS
  • Consul for cluster’s configuration settings
  • Ansible for software installation on the cluster
  • GitHub for version control
  • Docker as test and development environment
  • Powershell for running Docker and provision
  • Visual Studio Code for software development, running Powershell

In order to use a service like EC2 in AWS, the Virtual Private Cloud must be established. This is something I have automized using Terraform and Consul and described here. This provision is a “long-live” provision since VPC has practically no cost.

Prerequisites

I will not go into details of how to install all the technologies and services from the list. However, this GitHub repository does build a Docker container with Consul and latest Terraform. Consul in the Docker is an agent which connects to a global Consul server in Amazon. Documenting the global Consul is on my TO-DO list.

I suggest investing some time and creating a Consul with connection to your own GitHub repository that stores the configuration.

Repository on GitHub

The repository can be found at this address.

Repository Structure

There are two modules used in this project: instance and provision-spark. The module instance is pure Terraform code and does the provisioning of the instances (Spark’s master and workers) in the AWS. The output (DNS and IP addresses) of this module is the input for the module provision-spark which is more complex. It is written in Terraform, Ansible and Jinja2.

Ansible Roles

Below is the structure of the Ansible part of the module.

Roles prereq and spark are applied to all instances. The prereq role takes care of the prerequisites (java, anaconda) and the spark role downloads and installs Spark, and creates Linux objects needed for Spark to work. The start_spark_master applies only to the master instance and start_spark_workers to the worker instances. The role execute_on_spark automatically executes a job on Spark cluster (more on that later).

The path to the YAML file that executes the roles is available here.

Cluster Configuration

Cluster is configured in YAML format and the configuration is sent to the global Consul server. One configuration block servers one cluster. Example for cluster lr_iris can be found here.

Running the code

Provisioning starts in module provision-spark where the line

terraform apply -auto-approve

starts provisioning the cluster. Configuration is taken from Consul to populate the variables in Terraform. Ansible (inventory) file is created by Terraform after the EC2 instances are launched and started. After the inventory file is created, Terraform executes the spark.yml file and the rest is in the hands of Ansible. If everything goes well, the output is similar to the following:

This is a Terraform output as defined in the output.tf file.

The Spark cluster is now ready.

View in AWS Console

The instances in the Spark cluster look like this in AWS console:

Spark as a Service

Spark services running on master and workers are handled as services using systemctl. The services are created and started using Ansible: Spark workers start a service called spark-worker whose Ansible code can be seen here.
Spark Master has two services: spark-master and sparkhs (Spark History Server). Ansible code for both services is here.

Spark Master

Checking if Spark Master is available by using the public IP address and port 8080 should return an interface similar to this one:

Five workers were set up in the configuration file. This means we have a cluster with six instances: one is the Spark Master, the other five are the workers.

Spark History Server

Spark History Server, just like Spark Master become significant if long-live cluster is used. It helps monitoring and debugging the jobs (applications in Spark language).
Spark History Server can be reached at port 18080 on Spark Master.

Above is an example of an application that was executed on the Spark cluster. Note Event log directory – it is pointing to a local directory which will be removed once the cluster is destroyed. This is not an issue if we are running a long-live cluster, but if we want to keep logs for one-time clusters it is advised to store the logs externally. In this case, since Amazon is used, storing to S3 would be the best option.

Automatic Code Execution

The Spark cluster is now ready to use. Full automation process is achieved when the Spark code is automatically executed from the Terraform code once the cluster is available. In the repository, one of the Ansible roles is execute_on_spark which executes either a Python or a Scala code on the provisioned Spark cluster.

Which Spark code will be executed depends on the configuration in the YAML file. A path to a GitHub repository is part of the configuration and that repository is cloned to the Spark Master and executed.

An example mentioned above can be found here. The example is one of Hello Worlds in data science – Logistic Regression on Iris dataset. In this case, the Data Scientist is responsible for the input data and storing the results outside of the Spark cluster.

    input_file = "s3a://hdp-hive-s3/test/iris.csv"
    output_dir = "s3a://hdp-hive-s3/test/git_iris_out"

When the cluster is ready, the repository is cloned and the code is executed. Inside the code, the Data Scientist defines input and output. In this case, object storage S3 is used to do a one-time job, save the results and the Spark cluster is of no use anymore.

Installing Apache Spark 2.2.1

I have installed older Apache Spark versions and now the time is right to install Spark 2.2.1.

Update 27.09.2019: Automated Apache Spark install with Docker, Terraform and Ansible? Check out this post.

Im using an AWS t2.micro instance with Ubuntu 16.04 on it. MobaXterm is my choice of interface to SSH to the instance.

System update

sudo apt-get update -y
sudo apt-get upgrade -y

Change instance name

Go into the hostname file and change the name to spark

sudo vi /etc/hostname

Change localhost with instance name in hosts file

After the 127.0.0.1 write the name of the instance

sudo vi /etc/hosts

Reboot the instance

sudo reboot

Install and set up Java

If not sooner you will need Java for running History server. Java 8 is installed in the following way

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

Add JAVA_HOME to the environment file

sudo vi /etc/environment

And add the following line to the top of the file

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Doublecheck if this is the correct Java home.

Python

Spark 2.2.x supports Python 2.7+/3.4+. When running PySpark, Spark looks for Python in /usr/bin directory. Ubuntu 16.04 on AWS comes only with Python 3.5. When running PySpark without Python 2.7+, the following error message outputs

/usr/apache/spark-2.2.1-bin-hadoop2.7/bin/pyspark: line 45: python: command not found
env: ‘python’: No such file or directory

Options are two: install Python 2.7+ or create a link to Python3. The first alternative is acceptable only if Python packages that are in Python2 but not Python3 are going to be used. Otherwise, the latter alternative is the option. And this is done in the following way

sudo ln -s /usr/bin/python3 /usr/bin/python

Running PySpark now starts PySpark CLI with Python 3.5.2

And yes, running python or python3 will both execute the same action – start python 3.5.

Create user spark

sudo adduser spark

Define password, for the sake of testing, let’s go with spark

Prepare directory for Spark home

sudo mkdir /usr/apache

Step into the directory

cd /usr/apache

Download and unpack Apache Spark 2.2.1

sudo wget http://apache.uib.no/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
sudo tar -xvzf spark-2.2.1-bin-hadoop2.7.tgz

Clean up – delete the spark’s tgz file

sudo rm spark-2.2.1-bin-hadoop2.7.tgz

Change the owner of the spark directory

sudo chown spark:spark /usr/apache/spark-2.2.1-bin-hadoop2.7

Create Spark home

cd spark-2.2.1-bin-hadoop2.7
pwd

Output of the pwd command is the value for SPARK_HOME in the environment file. Open the file

sudo vi /etc/environment

And add the below SPARK_HOME line before the PATH line

export SPARK_HOME=/usr/apache/spark-2.2.1-bin-hadoop2.7

At the end of PATH add

:${SPARK_HOME}/bin

(You need the colon to separate from previous values)
Refresh the environment file

source /etc/environment

Create log and pid directories

sudo mkdir -p /var/log/spark/logs
sudo chown spark:spark -R /var/log/spark
sudo -u spark mkdir $SPARK_HOME/run
sudo -u spark chmod 777 /var/log/spark/logs

The last line allows every user to read, write to the directory. It is probably best to adjust access according to the needs.

If different users are going to run Spark applications and if those applications would want to be seen in History Server, user spark has to be added to the users’ group.
For example, user ubuntu is running Spark applications, that means user ubuntu writes log files to Spark History log directory (check below for the property spark.history.fs.logDirectory) and Spark is opening them through History Server. To be albe to do the latter, spark has to be a memeber of ubuntu group. This is done in the following way

sudo usermod -a -G ubuntu spark

Prepare spark-env.sh file

Open the file

sudo -u spark vi $SPARK_HOME/conf/spark-env.sh

Add the following values

SPARK_LOG_DIR=/var/log/spark
SPARK_PID_DIR=${SPARK_HOME}/run

Prepare spark-default.sh file

Open the file

sudo -u spark vi $SPARK_HOME/conf/spark-defaults.conf

Add the following values

spark.history.fs.logDirectory file:/var/log/spark/logs
spark.eventLog.enabled true
spark.eventLog.dir file:/var/log/spark/logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

Start Spark History

sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Instances IP address on port 18080 should open the Spark History Server. If not, check the /var/log/spark for errors and messages.

Manipulating files from S3 with Apache Spark

Update 22/5/2019: Here is a post about how to use Spark, Scala, S3 and sbt in Intellij IDEA to create a JAR application that reads from S3.

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is an S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).

Creating a multinode Apache Spark cluster on AWS from command line

The idea is to create a Spark cluster on AWS according to the needs, maximize the use of it and terminate it after the processing is done.

All the instances have Ubuntu 14.04 operating system, the instance used in the following script the free-tier t2.micro instance with 8GB storage and 1GB RAM.

To have this working from the command line, aws package has to be installed.

One instance, master in this case, is always in State “running” and this instance has the following installed:

  • aws package
  • Apache Spark

How to install Spark 2.0 is described here. There is also the link to the blog about how to install Spark History Server.

The Spark History Server can be reached on port 18080 – MASTER_PUBLIC_IP:18080 and Spark Master is on port 8080. If the pages do not load, start the services as spark user. The scripts for starting the services can be found in $SPARK_HOME/sbin.

The script for launching instances

The following script takes in one parameter – number of instances to launch and attach them to the spark cluster. These instances would become workers in the spark world.

A new script is created

vi create-spark-cluster.sh

And the following lines are written in it. Adjust accordingly (details below).

#!/bin/sh

#removes any old associations that have "worker" in it
sudo sed -i.bak '/worker/d' /etc/hosts

#removes known hosts from the file
sudo sed -i 'd' /home/ubuntu/.ssh/known_hosts

#run the loop to create as many slaves as defined in the input parameter
for i in $(seq -f %02g 1 $1)
do
  #name of the worker
  NAME="worker$i"

  #run the aws command to create an instance and run a script when the instance is created.
  #the command returns the private IP address which is used to update the local /etc/hosts file
  PRIV_IP_ADDR=$(aws ec2 run-instances --image-id ami-0d77397e --count 1 \
            --instance-type t2.micro --key-name MYKEYPAIR \
            --user-data file:///PATH_TO_SCRIPT/user-data.sh \
            --subnet-id SUBNET_ID --security-group-ids SECURITY_GROUP \
            --associate-public-ip-address \
            --output text | grep PRIVATEIPADDRESSES | awk '{print $4 "\t" $3}')

   #add the IP and hostname association to the /etc/hosts file
   echo "$PRIV_IP_ADDR" "$NAME" | sudo tee -a /etc/hosts

done

Line 19: option user-data allows you to define a file that is run on the new instance. In this file, the steps for Spark worker installation and setup are defined.
MYKEYPAIR: name of the private key you use in AWS console when launching new instances (this is not a path to the private key on the instance you are running the script from!).
SUBNET_ID: ID of the subnet you are using. It is something one creates when starting with EC2 in AWS.
SECURITY_GROUP: name of the existing security group that this instance should use.

 

The script that runs at instance launch

The following commands in the script are run as soon as each instance is created.
At the top of the script, the operating system is updated, python is installed and JDK as well.

#!/bin/sh

sudo apt-get update -y  && \
sudo apt-get install python-minimal -y && \
sudo apt-get install default-jdk -y && \
cd /etc/ && \
sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz && \
sudo tar -xvzf spark-2.0.1-bin-hadoop2.7.tgz && \
sudo rm spark-2.0.1-bin-hadoop2.7.tgz && \
sudo useradd spark && \
export SPARK_HOME=/etc/spark-2.0.1-bin-hadoop2.7 && \
sudo chown -R spark:spark $SPARK_HOME && \
sudo -u spark mkdir $SPARK_HOME/logs && \
sudo -u spark $SPARK_HOME/sbin/start-slave.sh spark://ip-10-0-0-95.eu-west-1.compute.internal:7077

Lines 6-7: Spark package is downloaded into /etc/ and unpacked.
Line 14: The slave is started and connected to the master node (Spark Master’s URL address is the script’s parameter).

Example of running the script

Thre following example creates a Spark cluster with 3 workers.

sh create-spark-cluster.sh 3

Example of the output:

10.0.0.104      ip-10-0-0-104.eu-west-1.compute.internal worker01
10.0.0.80       ip-10-0-0-80.eu-west-1.compute.internal worker02
10.0.0.47       ip-10-0-0-47.eu-west-1.compute.internal worker03

These three lines are also added to the /etc/hosts.

The Spark Master Web Interface (SPARK_MASTER_PUBLIC_IP:8080) gives more details about the Spark cluster.

spark-master

The interface displays the recently added workers with State being ALIVE. Two workers have are in state DEAD – they have been terminated from AWS, but Spark Master has not updated the Workers statistic yet.

Memory in use is 3 GB – the instances used in the cluster creation has one GB each.

Conclusion

The blog post shows a very simple way of creating a Spark cluster. The cluster creation script can be made a lot more dynamic and the script that is run on the newly created instances could be extended (installing python packaged needed for work).

Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Yarn application has already ended! It might have been killed or unable to launch application master.

If you are struggling with the error message in title of the post check if you are controlling ports that Spark needs. I have experienced that if the ports Spark is using can not be reached, YARN is going to terminate with the error message in the title. So it is best to control Spark ports and open them so that the YARN application would go through. More on Spark and networking here.

Spark chooses random ports and unless you have ALL ports open, you might run into the “endless”

INFO Client: Application report for application_1470560331181_0013 (state: ACCEPTED)

which eventually fails

INFO Client: Application report for application_1470560331181_0013 (state: FAILED)

and the error message returned would be

ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.

Adding something like this in spark-defaults.conf

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

could solve this issue.

My notes on installing Spark 2.0 are here.

And how to install Spark 1.6 is described here.