Manipulating files from S3 with Apache Spark

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is a S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).

Creating a multinode Apache Spark cluster on AWS from command line

The idea is to create a Spark cluster on AWS according to the needs, maximize the use of it and terminate it after the processing is done.

All the instances have Ubuntu 14.04 operating system, the instance used in the following script the free-tier t2.micro instance with 8GB storage and 1GB RAM.

To have this working from the command line, aws package has to be installed.

One instance, master in this case, is always in State “running” and this instance has the following installed:

  • aws package
  • Apache Spark

How to install Spark 2.0 is described here. There is also the link to the blog about how to install Spark History Server.

The Spark History Server can be reached on port 18080 – MASTER_PUBLIC_IP:18080 and Spark Master is on port 8080. If the pages do not load, start the services as spark user. The scripts for starting the services can be found in $SPARK_HOME/sbin.

The script for launching instances

The following script takes in one parameter – number of instances to launch and attach them to the spark cluster. These instances would become workers in the spark world.

A new script is created

vi create-spark-cluster.sh

And the following lines are written in it. Adjust accordingly (details below).

#!/bin/sh

#removes any old associations that have "worker" in it
sudo sed -i.bak '/worker/d' /etc/hosts

#removes known hosts from the file
sudo sed -i 'd' /home/ubuntu/.ssh/known_hosts

#run the loop to create as many slaves as defined in the input parameter
for i in $(seq -f %02g 1 $1)
do
  #name of the worker
  NAME="worker$i"

  #run the aws command to create an instance and run a script when the instance is created.
  #the command returns the private IP address which is used to update the local /etc/hosts file
  PRIV_IP_ADDR=$(aws ec2 run-instances --image-id ami-0d77397e --count 1 \
            --instance-type t2.micro --key-name MYKEYPAIR \
            --user-data file:///PATH_TO_SCRIPT/user-data.sh \
            --subnet-id SUBNET_ID --security-group-ids SECURITY_GROUP \
            --associate-public-ip-address \
            --output text | grep PRIVATEIPADDRESSES | awk '{print $4 "\t" $3}')

   #add the IP and hostname association to the /etc/hosts file
   echo "$PRIV_IP_ADDR" "$NAME" | sudo tee -a /etc/hosts

done

Line 19: option user-data allows you to define a file that is run on the new instance. In this file, the steps for Spark worker installation and setup are defined.
MYKEYPAIR: name of the private key you use in AWS console when launching new instances (this is not a path to the private key on the instance you are running the script from!).
SUBNET_ID: ID of the subnet you are using. It is something one creates when starting with EC2 in AWS.
SECURITY_GROUP: name of the existing security group that this instance should use.

 

The script that runs at instance launch

The following commands in the script are run as soon as each instance is created.
At the top of the script, the operating system is updated, python is installed and JDK as well.

#!/bin/sh

sudo apt-get update -y  && \
sudo apt-get install python-minimal -y && \
sudo apt-get install default-jdk -y && \
cd /etc/ && \
sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz && \
sudo tar -xvzf spark-2.0.1-bin-hadoop2.7.tgz && \
sudo rm spark-2.0.1-bin-hadoop2.7.tgz && \
sudo useradd spark && \
export SPARK_HOME=/etc/spark-2.0.1-bin-hadoop2.7 && \
sudo chown -R spark:spark $SPARK_HOME && \
sudo -u spark mkdir $SPARK_HOME/logs && \
sudo -u spark $SPARK_HOME/sbin/start-slave.sh spark://ip-10-0-0-95.eu-west-1.compute.internal:7077

Lines 6-7: Spark package is downloaded into /etc/ and unpacked.
Line 14: The slave is started and connected to the master node (Spark Master’s URL address is the script’s parameter).

Example of running the script

Thre following example creates a Spark cluster with 3 workers.

sh create-spark-cluster.sh 3

Example of the output:

10.0.0.104      ip-10-0-0-104.eu-west-1.compute.internal worker01
10.0.0.80       ip-10-0-0-80.eu-west-1.compute.internal worker02
10.0.0.47       ip-10-0-0-47.eu-west-1.compute.internal worker03

These three lines are also added to the /etc/hosts.

The Spark Master Web Interface (SPARK_MASTER_PUBLIC_IP:8080) gives more details about the Spark cluster.

spark-master

The interface displays the recently added workers with State being ALIVE. Two workers have are in state DEAD – they have been terminated from AWS, but Spark Master has not updated the Workers statistic yet.

Memory in use is 3 GB – the instances used in the cluster creation has one GB each.

Conclusion

The blog post shows a very simple way of creating a Spark cluster. The cluster creation script can be made a lot more dynamic and the script that is run on the newly created instances could be extended (installing python packaged needed for work).