Manipulating files from S3 with Apache Spark

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is a S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).

Creating first Scala project with sbt and submitting it to Apache Spark

The environment for the following project build was the following: Ubuntu 14.04 on a AWS EC2 instance, sbt version 0.13.13 (how to install it) and Apache Spark 2.0.1 on local mode (although the same procedure has been done and worked on a Hortonworks Hadoop cluster with Spark 2.0).

The Scala example file creates a SparkSession (if you are using Apache Spark version older than 2.0, check how to create all the context in order to run the example. Or upgrade to Spark 2.0!), reads a csv file into a DataFrame and outputs the DataFrame to the command line.

Create new project folder and step in it

mkdir scala-ne
cd scala-ne

Create data folder, step in it and create a test data file

mkdir data
cd data
vi n-europe.csv
1,Oslo,Norway
2,Stockholm,Sweden
3,Helsinki,Finland
4,Copenhagen,Danmark
5,Reykjavik,Iceland

Save the data file and exit vi.

Create the Scala file

vi spark-ne.scala

Copy the following code in the Scala file (make sure the path to the csv file is valid). If you are doing this on an node that is a part of HDFS cluster, be sure to add file:// at the beginning of the file path string.

import org.apache.spark.sql.SparkSession

object ne {

        def main(args: Array[String]) {
                val fil = "/SPARK-NE_PROJECT/data/n-europe.csv"

                val spark = SparkSession
                        .builder
                        .appName("Scala-Northern-E")
                        .getOrCreate()

                val neDF = spark.read.csv(fil)
                neDF.show()
        }
}

Create build file build.bdt

vi build.bdt

And type the following lines (make sure to leave the empty line between each line). Adjust the Scala and spark-sql version accordingly!

name := "Spark-ne"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

Run the following command to build the project

sbt package

The last three line of the output

[info] Packaging /user/marko/scala-ne/target/scala-2.11/spark-ne_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 69 s, completed Jan 7, 2017 8:22:38 PM

Running the sbt package command the first time is going to take more time because the jar files are downloading. Maven users should feel like home here.
If you make changes to the Scala file and run the command again, it takes less time. Below is an example of the next build call

[info] Packaging /user/marko/scala-ne/target/scala-2.11/spark-ne_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed Jan 7, 2017 8:29:12 PM

The code can now be submitted to Spark. Adjust the path to the JAR file accordingly.

$SPARK_HOME/bin/spark-submit --class ne /user/marko/scala-ne/target/scala-2.11/spark-ne_2.11-1.0.jar

If everything went as it should, the table of Northern European cities and countries is seen in the output.

Installing sbt on Ubuntu for building Scala projects

Using Apache Spark for big data processing offers also a possibility to use Scala. Despite Python being more popular than Scala, Scala is still THE language in Apache Spark world. It is time to start writing code in it.
The interactive tool sbt helps you build Scala and Java projects. It is similar to Java’s Maven or Ant. It offers native support for compiling Scala and, among other things, offers support for mixed Scala/Java projects.

Run the following to install sbt.

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update -y
sudo apt-get install sbt -y

Test sbt by running

sbt version

Should return something like this

[info] Set current project to ubuntu (in build file:/home/ubuntu/)
[info] 0.1-SNAPSHOT

The tool is now installed and ready to use.

You can run sbt by simply typing

sbt

Creating an example Scala project that works with Apache Spark is described in this post.