The idea behind this blog post is to write a Spark application in Scala, build the project with sbt and run the application which reads from a simple text file in S3.
If you are interested in how to access S3 files from Apache Spark with Ansible, check out this post.
A couple of Spark and Scala functionalities that can be picked up in this post:
- how to create SparkSession
- how to create SparkContext
- auxiliary constructors
- submitting Spark application with arguments
- error handling in Spark
- adding column to Spark DataFrame
My operating system is Windows 10 with Spark 2.4.0 installed on it. Spark has been installed using this tutorial. I have not installed Scala on my machine. I do have Java, version 8. Keep in mind Spark does not support Java 9. It is ok to have multiple versions installed, just make sure you make the switch to Java 8 in the IDE.
My IDE of choice is Intellij IDEA Community 2019.1. It is possible to run the code test, import libraries (using sbt), package JAR file and run the JAR from the IDE.
I have a small single node Spark cluster in AWS (one instance type t2.micro) for testing the JAR file outside of my development environment. This instance is accessed, and the project tested from mobaXterm.
Create application
Create new Scala project
Type in name of the project and change the JDK path to Java 8 if default points to some other version. Spark 2.4.0 is using Scala 2.11.12 so make sure the Scala version matches. This can be changed later in the sbt file.
IntelliJ IDEA creates the project and the structure of it is as below image:
Spark libraries are imported using the sbt file. Copy and paste the following line to the sbt file:
// https://mvnrepository.com/artifact/org.apache.spark/spark-core libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0" // https://mvnrepository.com/artifact/org.apache.spark/spark-sql libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" // https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3"In the lower right corner will there will be a pop-up window asking you if you want to import changes. Accept that.
Reading files in Spark
A rookie mistake is to create a file locally, run it in the development environment, which is your own computer, where it works as it should, and then scale-up to a cluster where the file cannot be found. It is easy to forget that the file should be in the same location on all workers in the Spark cluster not just on the instance that servers as the client!
To avoid this, an external storage is introduced. In the case of this post, it is the object storage of AWS – S3.
The test file I am using here is a simple one-line txt file:
a;b;cIf you plan to use other file structure make sure to change the schema definition in the code.
Creating Scala class
The environment, input file and the sbt file are now ready. Let us write the example. Create new Scala class.
Make sure to choose Object from the Kind dropdown menu.
Open Test.scala and copy the following content in it:
import scala.util.Failure import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.functions.lit // primary constructor class TextAnalysis(path : String, comment : String) { //aux cons takes no inputs def this() { this("s3a://hdp-hive-s3/test.txt", "no params") } //aux cons takes file path as input def this(path : String) { this(path, "from s3a") } //create spark session private val spark = SparkSession .builder .master("local") .appName("kaggle") .getOrCreate() //Create a SparkContext to initialize Spark private val sc = spark.sparkContext def ReadFile(): Unit = { val testSchema = StructType(Array( StructField("first", StringType, true), StructField("second", StringType, true), StructField("third", StringType, true))) try { val df = spark.read .format("csv") .option("delimiter", ";") .schema(testSchema) .load(path) val df1 = df.withColumn("comment", lit(comment)) df1.show() } catch { case e: AnalysisException => { println(s"FILE $path NOT FOUND... EXITING...") } case unknown: Exception => { println("UNKNOWN EXCEPTION... EXITING...") println(Failure(unknown)) } } } } object Test { def main(args: Array[String]): Unit = { val sep = ";" val argsArray = args.mkString(sep).split(sep) val ta = new TextAnalysis("s3a://hdp-hive-s3/test.txt") ta.ReadFile() val ta1 = new TextAnalysis(argsArray(0), argsArray(1)) ta1.ReadFile() val ta2 = new TextAnalysis() ta2.ReadFile() } }In the object Test, three instances of the TextAnalysis class are created to demonstrate how to run Spark code with parameters as input arguments and without.
In order to make the second instance work the arguments need to be defined. Click Run -> Edit Configurations…
In textbox Program arguments write the following “s3a://hdp-hive-s3/test.txt;with args”. Click OK.
Right click on the code and click on the Run “Test” from the menu. This should generate a verbose log which should also give 3 prints of the test tables. Here is an example of one
The column “comment” is added in the Spark the code.
Package application in a JAR
Create artifact configuration
Open the Project Structure windows and choose Artifacts from the left menu: File -> Project Structure -> Artifacts.
Click on the +, choose JAR and From modules with dependencies.
Enter Module and Main Class manually or use the menus on the right. Click OK when done and click OK again to exit the window.
Build JAR
Open the menu Build -> Build Artifacts. Choose the JAR you wish to build and choose action Build.
This will create a new folder called “out” where the JAR file resides.
The test.jar is over 130 MB big. And 70 lines of code were written. All dependencies were packaged in this JAR file which can sometimes be acceptable but in majority of cases the dependencies are already on the server. And the same goes for this case. The JARs used to run this application are already inside the standard Spark cluster under $SPARK_HOME/jars. For working with S3 files the JAR file hadoop-aws should be added to the jars folder. The JAR can be found here.
Removing the dependent JARs is done in the following way from Build -> Build Artifacts.
In the window, on the right side of it, remove all the JAR files under the test.jar – mark them and click icon “minus”.
After saving the changes, rebuild the artifact using Build -> Build Artifacts and the Rebuild option from the menu. This should reduce the file size to 4 KB.
Testing the package on Spark single-node
If you have a working Spark cluster you can copy the JAR file to it and test it. I have a single node Spark cluster in AWS for this purpose and mobaXterm to connect to the cluster. From the folder where the JAR file has been copied run the following command:
sh $SPARK_HOME/bin/spark-submit --class Test test.jar "s3a://hdp-hive-s3/test.txt;test on Spark"Three tables should be seen in the output, among the log outputs, one of the tables is the following
This table shows how calling a Scala class with arguments from the command line works.
This wraps up the example of how Spark, Scala, sbt and S3 work together.
One thought on “Spark, Scala, sbt and S3”