Update 22/5/2019: Here is a post about how to use Spark, Scala, S3 and sbt in Intellij IDEA to create a JAR application that reads from S3.
This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.
This post assumes there is an S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.
The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.
Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars
wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.driver.extraClassPath /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar spark.hadoop.fs.s3a.access.key [ACCESS_KEY] spark.hadoop.fs.s3a.secret.key [SECRET_KEY]
Once the file is saved, we can test the access by starting spark-shell.
spark-shell --properties-file /home/ubuntu/s3-test/s3.properties
Once in Spark, the configuration properties can be checked by running
spark.conf.getAll
The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.
spark.conf.get("spark.hadoop.fs.s3a.impl")
The jars and credentials are read by Spark application now. Let us read the test file into an RDD.
val fRDD = sc.textFile("s3a://markosbucket/folder01")
Output:
fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24
Remember, this is an RDD, not a DataFrame or DataSet!
Print out first three lines from the file
fRDD.take(3)
Reading a CSV file directly into a DataFrame:
val fDF = sc.read.csv("s3a://markosbucket/folder01")
Writing to S3
Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file
mkdir data wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat
Now we can load the local file in a DataFrame
val airportDF = spark.read.csv("data/airports.dat")
Saving the DataFrame to S3 is done by first converting the Dataframe to RDD
airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")
Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.
Some Error messages with fixes
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
Fix:
The jar files were not added to Spark application.
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:
sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY]) sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist
Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).