Spark, Scala, sbt and S3

The idea behind this blog post is to write a Spark application in Scala, build the project with sbt and run the application which reads from a simple text file in S3.

A couple of Spark and Scala functionalities that can be picked up in this post:

  • how to create SparkSession
  • how to create SparkContext
  • auxiliary constructors
  • submitting Spark application with arguments
  • error handling in Spark
  • adding column to Spark DataFrame

My operating system is Windows 10 with Spark 2.4.0 installed on it. Spark has been installed using this tutorial. I have not installed Scala on my machine. I do have Java, version 8. Keep in mind Spark does not support Java 9. It is ok to have multiple versions installed, just make sure you make the switch to Java 8 in the IDE.

My IDE of choice is Intellij IDEA Community 2019.1. It is possible to run the code test, import libraries (using sbt), package JAR file and run the JAR from the IDE.

I have a small single node Spark cluster in AWS (one instance type t2.micro) for testing the JAR file outside of my development environment. This instance is accessed, and the project tested from mobaXterm.

Create application

Create new Scala project

create project

Type in name of the project and change the JDK path to Java 8 if default points to some other version. Spark 2.4.0 is using Scala 2.11.12 so make sure the Scala version matches. This can be changed later in the sbt file.

create project_2

IntelliJ IDEA creates the project and the structure of it is as below image:

project structure

Spark libraries are imported using the sbt file. Copy and paste the following line to the sbt file:

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3"

In the lower right corner will there will be a pop-up window asking you if you want to import changes. Accept that.

import changes

Reading files in Spark

A rookie mistake is to create a file locally, run it in the development environment, which is your own computer, where it works as it should, and then scale-up to a cluster where the file cannot be found. It is easy to forget that the file should be in the same location on all workers in the Spark cluster not just on the instance that servers as the client!

To avoid this, an external storage is introduced. In the case of this post, it is the object storage of AWS – S3.

The test file I am using here is a simple one-line txt file:

a;b;c

If you plan to use other file structure make sure to change the schema definition in the code.

Creating Scala class

The environment, input file and the sbt file are now ready. Let us write the example. Create new Scala class.

file new project.jpg

Make sure to choose Object from the Kind dropdown menu.

new scala class

Open Test.scala and copy the following content in it:

import scala.util.Failure
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.lit

// primary constructor
class TextAnalysis(path : String, comment : String) {

//aux cons takes no inputs
def this() {
  this("s3a://hdp-hive-s3/test.txt", "no params")
}

//aux cons takes file path as input
def this(path : String) {
  this(path, "from s3a")
}

  //create spark session
  private val spark = SparkSession
    .builder
    .master("local")
    .appName("kaggle")
    .getOrCreate()

  //Create a SparkContext to initialize Spark
  private val sc = spark.sparkContext

  def ReadFile(): Unit = {

    val testSchema = StructType(Array(
      StructField("first", StringType, true),
      StructField("second", StringType, true),
      StructField("third", StringType, true)))

    try {

      val df = spark.read
        .format("csv")
        .option("delimiter", ";")
        .schema(testSchema)
        .load(path)

      val df1 = df.withColumn("comment", lit(comment))
      df1.show()

    } catch {
      case e: AnalysisException => {
        println(s"FILE $path NOT FOUND... EXITING...")
      }
      case unknown: Exception => {
        println("UNKNOWN EXCEPTION... EXITING...")
        println(Failure(unknown))
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    val sep = ";"
    val argsArray = args.mkString(sep).split(sep)

    val ta = new TextAnalysis("s3a://hdp-hive-s3/test.txt")
    ta.ReadFile()

    val ta1 = new TextAnalysis(argsArray(0), argsArray(1))
    ta1.ReadFile()

    val ta2 = new TextAnalysis()
    ta2.ReadFile()
  }
}

In the object Test, three instances of the TextAnalysis class are created to demonstrate how to run  Spark code with parameters as input arguments and without.

In order to make the second instance work the arguments need to be defined. Click Run -> Edit Configurations…

In textbox Program arguments write the following “s3a://hdp-hive-s3/test.txt;with args”. Click OK.

Right click on the code and click on the Run “Test” from the menu. This should generate a verbose log which should also give 3 prints of the test tables. Here is an example of one

table output example

The column “comment” is added in the Spark the code.

Package application in a JAR

Create artifact configuration

Open the Project Structure windows and choose Artifacts from the left menu: File -> Project Structure -> Artifacts.

Click on the +, choose JAR and From modules with dependencies.

Enter Module and Main Class manually or use the menus on the right. Click OK when done and click OK again to exit the window.

create jar from modules

Build JAR

Open the menu Build -> Build Artifacts. Choose the JAR you wish to build and choose action Build.

build artifact - build.JPG

This will create a new folder called “out” where the JAR file resides.

out folder

The test.jar is over 130 MB big. And 70 lines of code were written. All dependencies were packaged in this JAR file which can sometimes be acceptable but in majority of cases the dependencies are already on the server. And the same goes for this case. The JARs used to run this application are already inside the standard Spark cluster under $SPARK_HOME/jars. For working with S3 files the JAR file hadoop-aws should be added to the jars folder. The JAR can be found here.

Removing the dependent JARs is done in the following way from Build -> Build Artifacts.

build artifact - edit.JPG

In the window, on the right side of it, remove all the JAR files under the test.jar – mark them and click icon “minus”.

project structure - removed dependencies

After saving the changes, rebuild the artifact using Build -> Build Artifacts and the Rebuild option from the menu. This should reduce the file size to 4 KB.

Testing the package on Spark single-node

If you have a working Spark cluster you can copy the JAR file to it and test it. I have a single node Spark cluster in AWS for this purpose and mobaXterm to connect to the cluster. From the folder where the JAR file has been copied run the following command:

sh $SPARK_HOME/bin/spark-submit --class Test test.jar "s3a://hdp-hive-s3/test.txt;test on Spark"

Three tables should be seen in the output, among the log outputs, one of the tables is the following

table output on Spark cluster

This table shows how calling a Scala class with arguments from the command line works.

This wraps up the example of how Spark, Scala, sbt and S3 work together.

Advertisements

Example of DevOps environment

This post builds on the theory from Introduction to Automation in the Cloud. It explains how the DevOps environment is build and used.

Cloud for testing

Creating a user account in the cloud of your choice is the best start. My choice was AWS and all infrastructures are built on AWS. When doing Proof of Concept (PoC) in the cloud on your own, you adopt the logic of companies who are entering the cloud era – you wish to minimize the costs. That means two things:

  • build services in the cloud when needed and destroy them once done using them
  • create a work/development environment on your own machine – Docker container is my choice.

AWS offers instance types (EC2 services in AWS world) called “t2.micro”, which are perfect for testing infrastructure scripts. For example, they will not get you further than installing services and starting a few services in your infrastructure, but they will be helpful letting you know if your install and configuration works as it should. That is where dynamic configuration comes in handy: once ready to run on bigger scale, just change the input configuration file (more on this later).

Work environment

Now we know we are planning to provision on AWS, we have access to the cloud, all we need is the work environment.

The tools needed are PowerShell, Docker and GitHub Desktop.

work-environment

Interaction between the tools used to prepare the work environment. Once the container is created, the user accesses it from PowerShell, except that now it is not Command Prompt anymore, but the operating system defined in the DockerFile.

GitHub Desktop connects you to the GitHub repositories you wish to clone or work on. This tool is used to push and pull changes to and from your repository on GitHub.

PowerShell is a Command Prompt on steroids, it is used to work with Docker images and containers. I am most certain you will try to maximize the experience and use PowerShell ISE. It will not work, since it is not compatible with Docker for Windows.

With Docker, you can create an environment on your operating system but independent of the system. In worst case scenario, you can delete the container and build it again. The DockerFile is the definition of the IMAGE you wish to use to create a container. An example of DockerFile with necessary files can be found here. This repository creates the Docker container with the tools needed for IaC work.

The Docker needs to be built from this folder since it picks up configuration from the DockerFile. I use PowerShell to build Docker containers which then serve me as an entry point to infrastructure-as-code development. Details about how to get started are in the README.md file. My flavour of Linux in the container is Centos.

Inside the container

The container consists of Ansible, Terraform, Consul and some other installations used to support the work (git2consul, awscli…). It also starts a local Consul server which can be reached at localhost:8501 (depending on the port you expose when running the container) from the browser on the client computer. The Consul server is populated from a GitHub repository which is a dedicated configuration repository – configuration in Consul. This means that configuration changes are pushed to the GitHub using GitHub Desktop and a process inside the Docker container called git2consul updates the Consul server.

Before being able to provision anything on AWS from the container, the AWS_ACCESS_KEY_ID and  AWS_SECRET_ACCESS_KEY should be set as environmental variables.

At this point the DevOps environment should be in place: Terraform and Consul are installed, Ansible is installed, git2consul is setup and local Consul server is running and ready to serve configuration settings.

tools_in_DevOps_env

Simple representation of the DevOps (work) environment with its main services.

Next post covers the configuration services (git2consul and Consul) and the key-value configuration files in GitHub.

Introduction to Automation in the Cloud

An attempt to explain how open source tools for automation are used for minimizing costs and maximizing control over infrastructure in the cloud.

Introduction

Automation or Infrastracture-as-Code (IaC) is the idea where all the infrastructure is written in scripts and the scripts are executed when needed. In the “old days” (and some vital parts of organization’s solutions) the infrastructure represented physical servers in the basement with software installed and maintained by the in-house engineers with the help of vendor’s consultants. With the Infrastructure-as-Code the only thing maintained are the scripts while the basement is housing the table-tennis table. The scripts are maintained by data engineers (so called DevOps engineers) and broader audience can now build, maintain and destroy the infrastructure. It does help that the cloud vendors have simplified the services that were once the domain of the network engineers, for example.

The tendency in the areas of data storage and data processing (or everywhere in the IT fields) is to move to a cloud. A private cloud, a public cloud or a hybrid. Those are the options. Moving everything to a public cloud (big three: AWS, Google Cloud Platform or Azure) will make you a smart consumer of those services moneywise. Your goal is to pay-as-you-go, meaning run your applications when needed on the infrastructure you need and destroy the infrastructure when results are saved.

“Pay-as-you-go in cloud”

For succeeding in pay-as-you-go concept, two things have emerged on the market:

  • cheap object storage (S3 on AWS, Blob Storage on Azure and Cloud Storage on Google Cloud Platform).
  • tools for Infrastructure-as-Code (IaC)

Cheap object storage is exactly that: low cost storage of files in all form, shapes and types. This allows to store data cheap and build infrastructure for processing when needed. This follows the idea of dividing storage and processing.

“Division of storage and processing resources”

Days of having Hadoop just to have Hadoop are over and a company needs a good reason to justify having and maintaining a Hadoop cluster. The division of storage and processing works if the infrastructure is dynamic, rather, if the infrastructure-as-code can fulfil user’s needs. The responsibility falls on the DevOp engineers and the tools.

There is no doubt that the tools are there, plenty to choose from already from the open source community. Since I am following the philosophy where companies pay less for licence and more for knowledge I focus on open source technologies in cloud.

“Organizations will pay less for licenses and more for knowledge”

Infrastructure-as-code should offer a robust and general solution where the infrastructure is configured through input parameters. With other words, users define the input parameters, run the code and get the customized solution. This is what I attempt to demonstrate in a few of my GitHub repositories. I will come to this in my later posts.

Choosing the tools to do the job is not simple. As it is not simple to pick the most suitable cloud distributor. Here in Norway, Azure is the most popular cloud solution, in my opinion, not because of quality but because of the market position and good sales people at Microsoft.

Myself, I have experience mostly with AWS (a reader might observe that I write Amazon Web Services as AWS while Google Cloud Platform is not GCP) and some with OpenStack and VMWare. Choosing a cloud vendor is not as problematic as it is choosing the architecture in your cloud. Using services provided by the cloud vendor results in a possible risk to be locked to one technology or vendor. Migration to another, similar, solution might be costly. And this should be an option always when working with new technologies where there are uncertainties if the proposed architecture will deliver.

“Locking yourself to one distributor can be risky”

The technology stack I use in my examples is the following:

Cloud vendor: AWS

Cloud Vendor’s services: S3 (object storage), VPC (virtual private cloud – mandatory for launching instances in AWS), EC2 (instances in the cloud – Linux servers)

aws

Object storage S3 for storing data is separated from the processing resources (made up from EC2 instances) which are in the mandatory VPC. Any other storage can be used if it has connectors, as well as S3 can be accessed externally.

Infrastructure as Code tools: Terraform (automation of services in the cloud), Consul (configuration of infrastructure to be created) and Ansible (software installation and administration of instances built in the cloud)

IaC tools.JPG

Symbiosis between the IaC tools: user stores configuration of desired infrastructure to Consul, Terraform reads the configuration at provisioning, saves new parameters back to Consul and at the same time executes the Ansible scripts which install and setup the software for the desired solution.

Work environment: Docker for Windows (container with Linux environment on local machine), PowerShell (for Docker creation and development and test of scripts)

Version control: GitHub and GitHub Desktop (for pulling and pushing to the repositories)

work-environment

Repository with files for Docker container creation is cloned from GitHub using GitHub Desktop. PowerShell is used to create the Docker image and start the Docker container. This Docker container represents the entry point to the Infrastructure-as-Code development and testing.

IDEs for coding: Atom (for Terraform, Ansible and Consul configuration), PyCharm and Jupyter (for Python scripts) and Intellij IDEA (for Scala scripts)

Next post goes in depth on the DevOps environment.

Adding service to HDP using REST API

One way of adding new service to the HDP is by using a graphical interface Ambari. In this post, it is explained how the same is done using Ambari’s REST API. The service added in this post is Pig. Pig is not a “classical” service, rather a package, but from REST API’s point of view, it is a service.

The documentation on how to do this is dated April 21 2014. I have followed it and eventually made it work. The documentation can be found here.

The cluster

I am using AWS EC2 services, operating system is Centos7.

I have an Ambari server, version 2.6.2 and an HDP cluster version 2.6.5. This should work on other versions as well.

My cluster has one NameNode, on which Ambari is installed as well, and one DataNode. The services installed are the bare minimum – HDFS, YARN, MapReduce2, Zookeeper and Hive.

The goal

Install Pig client on the NameNode and on the DataNode.

Adding Pig to the cluster

Variables

export AMBARI_SERVER=PUBLIC_IP
export MASTER_DNS=NAMENODE_PRIVATE_DNS
export SLAVE_DNS=DATANODE_PRIVATE_DNS
export CLUSTER_NAME=mincluster2

Create service on the Cluster

curl -u admin:admin -H "X-Requested-By:ambari" -i -X POST -d '{"ServiceInfo":{"service_name":"PIG"}}' 'http://'$AMBARI_SERVER':8080/api/v1/clusters/'$CLUSTER_NAME'/services'

Pig service is added to the list of services in Ambari.

Pig added - Configs
Pig added - Summary

Check for service on the cluster

curl -k -u admin:admin -H "X-Requested-By:ambari" -i -X GET 'http://'$AMBARI_SERVER':8080/api/v1/clusters/'$CLUSTER_NAME'/services/PIG'

curl - service info
The service is registered on the cluster.

Add components to the service

curl -k -u admin:admin -H "X-Requested-By:ambari" -i -X POST -d '{"RequestInfo":{"context":"Install PIG"}, "Body":{"HostRoles":{"state":"INSTALLED"}}}' 'http://'$AMBARI_SERVER':8080/api/v1/clusters/'$CLUSTER_NAME'/services/PIG/components/PIG'

Running the curl command from previous step to check if the component is added returns the following:

curl - service info after component.png
The component has been added according to the “components” element in the JSON output. The state of the service is still “UNKNOWN”.

Creating configuration is on the next page.

Apache Hadoop 3 as a Service on AWS

Apache Hadoop 3.1 cluster built from CLI. Link to github repository is below.

The general idea is to have a solution that builds an Apache Hadoop 3 cluster from command line. This can be useful for learning purposes, for testing or for spinning a Hadoop cluster for a certain job and then terminating it, hence minimizing costs.

Motivation

A couple of years ago I listened to a Spark Summit conference and one company introduced the following architectural solution: data were sitting in S3, when there was the need for analysis, a Hadoop cluster was created, data was pushed to HDFS and analyses were done. After the results were collected, the Hadoop cluster was terminated.

About

The code has no exception handling, it uses AWS’s t2.micro instances to prove the point. There is a lot of potential in building a friendly user interface to parametrize the solution. There is only one input parameter – number of datanodes. When using AWS’s free instances, make sure you do not have more than 20 of them running.

There are four files:

  • HaaS.sh
  • script_namenode.sh
  • script_datanode.sh
  • terminate_cluster.sh

The HaaS.sh file launches the instances for namenode and datanode(s) (namenode instance is dedicated for namenode related services – no datanode services are installed there). It is advised to start at least one datanode. Example on how to launch a cluster with 5 datanodes: . Haas.sh 5

When EC2 instance for namenode is ready, script_namenode.sh is executed on that instance. When EC2 instance(s) for datanode(s) are ready, script_datanode.sh is executed on the instance(s).

Prerequisities

I have defined one instance as “Initial” instance. This is where the scripts are located and this instance creates and terminates the cluster. This instance is not a part of the Hadoop cluster, it launches the cluster and terminates it. I am using Ubuntu 16.04 for all my instances. Make sure you have awscli package installed and aws configured on this initial instance.

Prerequisities on AWS

  • key pair
  • security group
    • open all traffic for all instances in the same subnet and security group
    • open port 9870 for Namenode Web Interface
    • open port 8088 for Resource Manager (YARN)
    • open port 19888 for MapReduce JobHistory server
  • subnet

Times

Launching a Hadoop cluster with 10 datanodes took less than 10 minutes. When testing, I did also come down to 8 minutes. I am using sleep command in the Haas.sh script in order to wait for the instances to either start running or for Hadoop to download and install (unpack). Room for optimization here as well.

Order of execution

The HaaS.sh script does the following actions:

  • launch namenode instance and read output text into a variable
  • parse the variable to collect instance id and private ip
  • create instances.list and add namenode instance id to it
  • append private ip and instance name to /etc/hosts
  • enable passwordless ssh to namenode
  • launch datanode(s)
  • update local /etc/hosts
  • create workers file
  • enable passwordless ssh to datanode(s)
  • start services on datanode(s)
  • copy /etc/hosts from initial instance to all Hadoop instances
  • copy workers file to namenode’s $HADOOP_HOME/etc/hadoop
  • start services on datanode(s)
  • remove temporary files

Link to the scripts can be found here.

Installing Apache Spark 2.2.1

I have installed older Apache Spark versions and now the time is right to install Spark 2.2.1.

Im using an AWS t2.micro instance with Ubuntu 16.04 on it. MobaXterm is my choice of interface to SSH to the instance.

System update

sudo apt-get update -y
sudo apt-get upgrade -y

Change instance name

Go into the hostname file and change the name to spark

sudo vi /etc/hostname

Change localhost with instance name in hosts file

After the 127.0.0.1 write the name of the instance

sudo vi /etc/hosts

Reboot the instance

sudo reboot

Install and set up Java

If not sooner you will need Java for running History server. Java 8 is installed in the following way

sudo add-apt-repository ppa:openjdk-r/ppa
sudo apt-get update
sudo apt-get install openjdk-8-jdk -y

Add JAVA_HOME to the environment file

sudo vi /etc/environment

And add the following line to the top of the file

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Doublecheck if this is the correct Java home.

Python

Spark 2.2.x supports Python 2.7+/3.4+. When running PySpark, Spark looks for Python in /usr/bin directory. Ubuntu 16.04 on AWS comes only with Python 3.5. When running PySpark without Python 2.7+, the following error message outputs

/usr/apache/spark-2.2.1-bin-hadoop2.7/bin/pyspark: line 45: python: command not found
env: ‘python’: No such file or directory

Options are two: install Python 2.7+ or create a link to Python3. The first alternative is acceptable only if Python packages that are in Python2 but not Python3 are going to be used. Otherwise, the latter alternative is the option. And this is done in the following way

sudo ln -s /usr/bin/python3 /usr/bin/python

Running PySpark now starts PySpark CLI with Python 3.5.2

And yes, running python or python3 will both execute the same action – start python 3.5.

Create user spark

sudo adduser spark

Define password, for the sake of testing, let’s go with spark

Prepare directory for Spark home

sudo mkdir /usr/apache

Step into the directory

cd /usr/apache

Download and unpack Apache Spark 2.2.1

sudo wget http://apache.uib.no/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
sudo tar -xvzf spark-2.2.1-bin-hadoop2.7.tgz

Clean up – delete the spark’s tgz file

sudo rm spark-2.2.1-bin-hadoop2.7.tgz

Change the owner of the spark directory

sudo chown spark:spark /usr/apache/spark-2.2.1-bin-hadoop2.7

Create Spark home

cd spark-2.2.1-bin-hadoop2.7
pwd

Output of the pwd command is the value for SPARK_HOME in the environment file. Open the file

sudo vi /etc/environment

And add the below SPARK_HOME line before the PATH line

export SPARK_HOME=/usr/apache/spark-2.2.1-bin-hadoop2.7

At the end of PATH add

:${SPARK_HOME}/bin

(You need the colon to separate from previous values)
Refresh the environment file

source /etc/environment

Create log and pid directories

sudo mkdir -p /var/log/spark/logs
sudo chown spark:spark -R /var/log/spark
sudo -u spark mkdir $SPARK_HOME/run
sudo -u spark chmod 777 /var/log/spark/logs

The last line allows every user to read, write to the directory. It is probably best to adjust access according to the needs.

If different users are going to run Spark applications and if those applications would want to be seen in History Server, user spark has to be added to the users’ group.
For example, user ubuntu is running Spark applications, that means user ubuntu writes log files to Spark History log directory (check below for the property spark.history.fs.logDirectory) and Spark is opening them through History Server. To be albe to do the latter, spark has to be a memeber of ubuntu group. This is done in the following way

sudo usermod -a -G ubuntu spark

Prepare spark-env.sh file

Open the file

sudo -u spark vi $SPARK_HOME/conf/spark-env.sh

Add the following values

SPARK_LOG_DIR=/var/log/spark
SPARK_PID_DIR=${SPARK_HOME}/run

Prepare spark-default.sh file

Open the file

sudo -u spark vi $SPARK_HOME/conf/spark-defaults.conf

Add the following values

spark.history.fs.logDirectory file:/var/log/spark/logs
spark.eventLog.enabled true
spark.eventLog.dir file:/var/log/spark/logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

Start Spark History

sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Instances IP address on port 18080 should open the Spark History Server. If not, check the /var/log/spark for errors and messages.

Manipulating files from S3 with Apache Spark

Update 22/5/2019: Here is a post about how to use Spark, Scala, S3 and sbt in Intellij IDEA to create a JAR application that reads from S3.

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is an S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).