Spark, Scala, sbt and S3

The idea behind this blog post is to write a Spark application in Scala, build the project with sbt and run the application which reads from a simple text file in S3.

A couple of Spark and Scala functionalities that can be picked up in this post:

  • how to create SparkSession
  • how to create SparkContext
  • auxiliary constructors
  • submitting Spark application with arguments
  • error handling in Spark
  • adding column to Spark DataFrame

My operating system is Windows 10 with Spark 2.4.0 installed on it. Spark has been installed using this tutorial. I have not installed Scala on my machine. I do have Java, version 8. Keep in mind Spark does not support Java 9. It is ok to have multiple versions installed, just make sure you make the switch to Java 8 in the IDE.

My IDE of choice is Intellij IDEA Community 2019.1. It is possible to run the code test, import libraries (using sbt), package JAR file and run the JAR from the IDE.

I have a small single node Spark cluster in AWS (one instance type t2.micro) for testing the JAR file outside of my development environment. This instance is accessed, and the project tested from mobaXterm.

Create application

Create new Scala project

create project

Type in name of the project and change the JDK path to Java 8 if default points to some other version. Spark 2.4.0 is using Scala 2.11.12 so make sure the Scala version matches. This can be changed later in the sbt file.

create project_2

IntelliJ IDEA creates the project and the structure of it is as below image:

project structure

Spark libraries are imported using the sbt file. Copy and paste the following line to the sbt file:

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3"

In the lower right corner will there will be a pop-up window asking you if you want to import changes. Accept that.

import changes

Reading files in Spark

A rookie mistake is to create a file locally, run it in the development environment, which is your own computer, where it works as it should, and then scale-up to a cluster where the file cannot be found. It is easy to forget that the file should be in the same location on all workers in the Spark cluster not just on the instance that servers as the client!

To avoid this, an external storage is introduced. In the case of this post, it is the object storage of AWS – S3.

The test file I am using here is a simple one-line txt file:

a;b;c

If you plan to use other file structure make sure to change the schema definition in the code.

Creating Scala class

The environment, input file and the sbt file are now ready. Let us write the example. Create new Scala class.

file new project.jpg

Make sure to choose Object from the Kind dropdown menu.

new scala class

Open Test.scala and copy the following content in it:

import scala.util.Failure
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.lit

// primary constructor
class TextAnalysis(path : String, comment : String) {

//aux cons takes no inputs
def this() {
  this("s3a://hdp-hive-s3/test.txt", "no params")
}

//aux cons takes file path as input
def this(path : String) {
  this(path, "from s3a")
}

  //create spark session
  private val spark = SparkSession
    .builder
    .master("local")
    .appName("kaggle")
    .getOrCreate()

  //Create a SparkContext to initialize Spark
  private val sc = spark.sparkContext

  def ReadFile(): Unit = {

    val testSchema = StructType(Array(
      StructField("first", StringType, true),
      StructField("second", StringType, true),
      StructField("third", StringType, true)))

    try {

      val df = spark.read
        .format("csv")
        .option("delimiter", ";")
        .schema(testSchema)
        .load(path)

      val df1 = df.withColumn("comment", lit(comment))
      df1.show()

    } catch {
      case e: AnalysisException => {
        println(s"FILE $path NOT FOUND... EXITING...")
      }
      case unknown: Exception => {
        println("UNKNOWN EXCEPTION... EXITING...")
        println(Failure(unknown))
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    val sep = ";"
    val argsArray = args.mkString(sep).split(sep)

    val ta = new TextAnalysis("s3a://hdp-hive-s3/test.txt")
    ta.ReadFile()

    val ta1 = new TextAnalysis(argsArray(0), argsArray(1))
    ta1.ReadFile()

    val ta2 = new TextAnalysis()
    ta2.ReadFile()
  }
}

In the object Test, three instances of the TextAnalysis class are created to demonstrate how to run  Spark code with parameters as input arguments and without.

In order to make the second instance work the arguments need to be defined. Click Run -> Edit Configurations…

In textbox Program arguments write the following “s3a://hdp-hive-s3/test.txt;with args”. Click OK.

Right click on the code and click on the Run “Test” from the menu. This should generate a verbose log which should also give 3 prints of the test tables. Here is an example of one

table output example

The column “comment” is added in the Spark the code.

Package application in a JAR

Create artifact configuration

Open the Project Structure windows and choose Artifacts from the left menu: File -> Project Structure -> Artifacts.

Click on the +, choose JAR and From modules with dependencies.

Enter Module and Main Class manually or use the menus on the right. Click OK when done and click OK again to exit the window.

create jar from modules

Build JAR

Open the menu Build -> Build Artifacts. Choose the JAR you wish to build and choose action Build.

build artifact - build.JPG

This will create a new folder called “out” where the JAR file resides.

out folder

The test.jar is over 130 MB big. And 70 lines of code were written. All dependencies were packaged in this JAR file which can sometimes be acceptable but in majority of cases the dependencies are already on the server. And the same goes for this case. The JARs used to run this application are already inside the standard Spark cluster under $SPARK_HOME/jars. For working with S3 files the JAR file hadoop-aws should be added to the jars folder. The JAR can be found here.

Removing the dependent JARs is done in the following way from Build -> Build Artifacts.

build artifact - edit.JPG

In the window, on the right side of it, remove all the JAR files under the test.jar – mark them and click icon “minus”.

project structure - removed dependencies

After saving the changes, rebuild the artifact using Build -> Build Artifacts and the Rebuild option from the menu. This should reduce the file size to 4 KB.

Testing the package on Spark single-node

If you have a working Spark cluster you can copy the JAR file to it and test it. I have a single node Spark cluster in AWS for this purpose and mobaXterm to connect to the cluster. From the folder where the JAR file has been copied run the following command:

sh $SPARK_HOME/bin/spark-submit --class Test test.jar "s3a://hdp-hive-s3/test.txt;test on Spark"

Three tables should be seen in the output, among the log outputs, one of the tables is the following

table output on Spark cluster

This table shows how calling a Scala class with arguments from the command line works.

This wraps up the example of how Spark, Scala, sbt and S3 work together.

Service configuration tools and files

The previous post mentions Consul and git2consul which are storing the parameters and fetching data from GitHub to Consul. It is only fair to gain more in-depth knowledge about them.

Consul is a product of a company called HashiCorp and together with Terraform (and other I do not mention yet) forms a group of tools called HashiStack. Consul is a tool for service configuration we build with our scripts. The scripts are the general presentation of the to-be state, while the configuration in Consul personalizes the infrastructure we plan to build (provision).

Service git2consul “mirrors the contents of a git repository into Consul KVs”. With other words, the service reads a git repository and creates/updates key-value pairs in Consul.

git2consul_Consul_cooperation

High level presentation of git2consul and Consul cooperation – git2consul periodically reads from a given GitHub repository and updates the Consul server

The previous post describes how local Consul server is started when the Docker container is ran. Local Consul is acceptable for testing purposes, it is possible and advised to build a distributed Consul service which offers High Availability (avoids single point of failure).

Configuration in YAML

A dedicated GitHub repository for the configuration parameters for my IaC projects can be found here. One YAML file for one project. For example configuration for the VPC architecture defines the services built in VPC that serve as the foundation for clusters built on top, for example configurations (I write in plural since there are/can be more than one) for a Spark cluster.

The git2consul configuration file inside the Docker container holds parameters, among them also the URL to the GitHub repository that serves as configuration repository. The file I am using for my git2consul service is here. It is copied over in the container when the image is created.

The following graphic show the same configuration parameters in three ways. First image is YAML file as seen in GitHub (I use Atom for development), second picture shows the same parameters as seen using a consul API from the command line in the Docker and the third picture shows a print screen of the same parameters in Consul web server.

configuration_example_aws

Three views of same key-value pairs – GitHub, command line and Consul web server

Another example, this one of Machine Learning in Spark shows how two different machine learning projects are configured in spark.yml. The prerequisite to run either of this is the VPC infrastructure and the input files (key spark_job_args). This example show that the scripts used to build the Spark cluster are untouched while the configuration in Consul personalizes the use case. If a new Spark job should be run, it is best to copy an existing block of key-value pairs and change to fit the needs.

A more complex example is the hdp.yml file which holds key-value pairs for five different Hadoop clusters. All can be provisioned using the same Terraform and Ansible scripts.

Writing to Consul at runtime

As mentioned a couple of times, the VPC in AWS is prerequisite and the established VPC is where all the following solutions are built in. This requires saving some values of the VPC so that they can be picked up at the provisioning of the next solution. These values are saved in Consul and are NOT pushed to GitHub – git2consul works one way only.

Once the VPC is provisioned, Terraform writes to Consul in a path defined by the user. In my example, everything starting with generated under the aws is coming from Consul.

consul_generated

Key-value pairs generated when VPC is provisioned. Observe the last line – it specifies the name which should be used to gather all generated key-value pairs under.

These values are further picked up in other Terraform scripts so that the infrastructure that is being build knows where to fit in. I mentioned Spark and Hadoop earlier – the instances launched in AWS need the generated key-value pairs for successful launch.

Example of DevOps environment

This post builds on the theory from Introduction to Automation in the Cloud. It explains how the DevOps environment is build and used.

Cloud for testing

Creating a user account in the cloud of your choice is the best start. My choice was AWS and all infrastructures are built on AWS. When doing Proof of Concept (PoC) in the cloud on your own, you adopt the logic of companies who are entering the cloud era – you wish to minimize the costs. That means two things:

  • build services in the cloud when needed and destroy them once done using them
  • create a work/development environment on your own machine – Docker container is my choice.

AWS offers instance types (EC2 services in AWS world) called “t2.micro”, which are perfect for testing infrastructure scripts. For example, they will not get you further than installing services and starting a few services in your infrastructure, but they will be helpful letting you know if your install and configuration works as it should. That is where dynamic configuration comes in handy: once ready to run on bigger scale, just change the input configuration file (more on this later).

Work environment

Now we know we are planning to provision on AWS, we have access to the cloud, all we need is the work environment.

The tools needed are PowerShell, Docker and GitHub Desktop.

work-environment

Interaction between the tools used to prepare the work environment. Once the container is created, the user accesses it from PowerShell, except that now it is not Command Prompt anymore, but the operating system defined in the DockerFile.

GitHub Desktop connects you to the GitHub repositories you wish to clone or work on. This tool is used to push and pull changes to and from your repository on GitHub.

PowerShell is a Command Prompt on steroids, it is used to work with Docker images and containers. I am most certain you will try to maximize the experience and use PowerShell ISE. It will not work, since it is not compatible with Docker for Windows.

With Docker, you can create an environment on your operating system but independent of the system. In worst case scenario, you can delete the container and build it again. The DockerFile is the definition of the IMAGE you wish to use to create a container. An example of DockerFile with necessary files can be found here. This repository creates the Docker container with the tools needed for IaC work.

The Docker needs to be built from this folder since it picks up configuration from the DockerFile. I use PowerShell to build Docker containers which then serve me as an entry point to infrastructure-as-code development. Details about how to get started are in the README.md file. My flavour of Linux in the container is Centos.

Inside the container

The container consists of Ansible, Terraform, Consul and some other installations used to support the work (git2consul, awscli…). It also starts a local Consul server which can be reached at localhost:8501 (depending on the port you expose when running the container) from the browser on the client computer. The Consul server is populated from a GitHub repository which is a dedicated configuration repository – configuration in Consul. This means that configuration changes are pushed to the GitHub using GitHub Desktop and a process inside the Docker container called git2consul updates the Consul server.

Before being able to provision anything on AWS from the container, the AWS_ACCESS_KEY_ID and  AWS_SECRET_ACCESS_KEY should be set as environmental variables.

At this point the DevOps environment should be in place: Terraform and Consul are installed, Ansible is installed, git2consul is setup and local Consul server is running and ready to serve configuration settings.

tools_in_DevOps_env

Simple representation of the DevOps (work) environment with its main services.

Next post covers the configuration services (git2consul and Consul) and the key-value configuration files in GitHub.

Introduction to Automation in the Cloud

An attempt to explain how open source tools for automation are used for minimizing costs and maximizing control over infrastructure in the cloud.

Introduction

Automation or Infrastracture-as-Code (IaC) is the idea where all the infrastructure is written in scripts and the scripts are executed when needed. In the “old days” (and some vital parts of organization’s solutions) the infrastructure represented physical servers in the basement with software installed and maintained by the in-house engineers with the help of vendor’s consultants. With the Infrastructure-as-Code the only thing maintained are the scripts while the basement is housing the table-tennis table. The scripts are maintained by data engineers (so called DevOps engineers) and broader audience can now build, maintain and destroy the infrastructure. It does help that the cloud vendors have simplified the services that were once the domain of the network engineers, for example.

The tendency in the areas of data storage and data processing (or everywhere in the IT fields) is to move to a cloud. A private cloud, a public cloud or a hybrid. Those are the options. Moving everything to a public cloud (big three: AWS, Google Cloud Platform or Azure) will make you a smart consumer of those services moneywise. Your goal is to pay-as-you-go, meaning run your applications when needed on the infrastructure you need and destroy the infrastructure when results are saved.

“Pay-as-you-go in cloud”

For succeeding in pay-as-you-go concept, two things have emerged on the market:

  • cheap object storage (S3 on AWS, Blob Storage on Azure and Cloud Storage on Google Cloud Platform).
  • tools for Infrastructure-as-Code (IaC)

Cheap object storage is exactly that: low cost storage of files in all form, shapes and types. This allows to store data cheap and build infrastructure for processing when needed. This follows the idea of dividing storage and processing.

“Division of storage and processing resources”

Days of having Hadoop just to have Hadoop are over and a company needs a good reason to justify having and maintaining a Hadoop cluster. The division of storage and processing works if the infrastructure is dynamic, rather, if the infrastructure-as-code can fulfil user’s needs. The responsibility falls on the DevOp engineers and the tools.

There is no doubt that the tools are there, plenty to choose from already from the open source community. Since I am following the philosophy where companies pay less for licence and more for knowledge I focus on open source technologies in cloud.

“Organizations will pay less for licenses and more for knowledge”

Infrastructure-as-code should offer a robust and general solution where the infrastructure is configured through input parameters. With other words, users define the input parameters, run the code and get the customized solution. This is what I attempt to demonstrate in a few of my GitHub repositories. I will come to this in my later posts.

Choosing the tools to do the job is not simple. As it is not simple to pick the most suitable cloud distributor. Here in Norway, Azure is the most popular cloud solution, in my opinion, not because of quality but because of the market position and good sales people at Microsoft.

Myself, I have experience mostly with AWS (a reader might observe that I write Amazon Web Services as AWS while Google Cloud Platform is not GCP) and some with OpenStack and VMWare. Choosing a cloud vendor is not as problematic as it is choosing the architecture in your cloud. Using services provided by the cloud vendor results in a possible risk to be locked to one technology or vendor. Migration to another, similar, solution might be costly. And this should be an option always when working with new technologies where there are uncertainties if the proposed architecture will deliver.

“Locking yourself to one distributor can be risky”

The technology stack I use in my examples is the following:

Cloud vendor: AWS

Cloud Vendor’s services: S3 (object storage), VPC (virtual private cloud – mandatory for launching instances in AWS), EC2 (instances in the cloud – Linux servers)

aws

Object storage S3 for storing data is separated from the processing resources (made up from EC2 instances) which are in the mandatory VPC. Any other storage can be used if it has connectors, as well as S3 can be accessed externally.

Infrastructure as Code tools: Terraform (automation of services in the cloud), Consul (configuration of infrastructure to be created) and Ansible (software installation and administration of instances built in the cloud)

IaC tools.JPG

Symbiosis between the IaC tools: user stores configuration of desired infrastructure to Consul, Terraform reads the configuration at provisioning, saves new parameters back to Consul and at the same time executes the Ansible scripts which install and setup the software for the desired solution.

Work environment: Docker for Windows (container with Linux environment on local machine), PowerShell (for Docker creation and development and test of scripts)

Version control: GitHub and GitHub Desktop (for pulling and pushing to the repositories)

work-environment

Repository with files for Docker container creation is cloned from GitHub using GitHub Desktop. PowerShell is used to create the Docker image and start the Docker container. This Docker container represents the entry point to the Infrastructure-as-Code development and testing.

IDEs for coding: Atom (for Terraform, Ansible and Consul configuration), PyCharm and Jupyter (for Python scripts) and Intellij IDEA (for Scala scripts)

Next post goes in depth on the DevOps environment.

Adding service to HDP using REST API

One way of adding new service to the HDP is by using a graphical interface Ambari. In this post, it is explained how the same is done using Ambari’s REST API. The service added in this post is Pig. Pig is not a “classical” service, rather a package, but from REST API’s point of view, it is a service.

The documentation on how to do this is dated April 21 2014. I have followed it and eventually made it work. The documentation can be found here.

The cluster

I am using AWS EC2 services, operating system is Centos7.

I have an Ambari server, version 2.6.2 and an HDP cluster version 2.6.5. This should work on other versions as well.

My cluster has one NameNode, on which Ambari is installed as well, and one DataNode. The services installed are the bare minimum – HDFS, YARN, MapReduce2, Zookeeper and Hive.

The goal

Install Pig client on the NameNode and on the DataNode.

Adding Pig to the cluster

Variables

export AMBARI_SERVER=PUBLIC_IP
export MASTER_DNS=NAMENODE_PRIVATE_DNS
export SLAVE_DNS=DATANODE_PRIVATE_DNS
export CLUSTER_NAME=mincluster2

Create service on the Cluster

curl -u admin:admin -H "X-Requested-By:ambari" -i -X POST -d '{"ServiceInfo":{"service_name":"PIG"}}' 'http://'$AMBARI_SERVER':8080/api/v1/clusters/'$CLUSTER_NAME'/services'

Pig service is added to the list of services in Ambari.

Pig added - Configs
Pig added - Summary

Check for service on the cluster

curl -k -u admin:admin -H "X-Requested-By:ambari" -i -X GET 'http://'$AMBARI_SERVER':8080/api/v1/clusters/'$CLUSTER_NAME'/services/PIG'

curl - service info
The service is registered on the cluster.

Add components to the service

curl -k -u admin:admin -H "X-Requested-By:ambari" -i -X POST -d '{"RequestInfo":{"context":"Install PIG"}, "Body":{"HostRoles":{"state":"INSTALLED"}}}' 'http://'$AMBARI_SERVER':8080/api/v1/clusters/'$CLUSTER_NAME'/services/PIG/components/PIG'

Running the curl command from previous step to check if the component is added returns the following:

curl - service info after component.png
The component has been added according to the “components” element in the JSON output. The state of the service is still “UNKNOWN”.

Creating configuration is on the next page.

Apache Hadoop 3 as a Service on AWS

Apache Hadoop 3.1 cluster built from CLI. Link to github repository is below.

The general idea is to have a solution that builds an Apache Hadoop 3 cluster from command line. This can be useful for learning purposes, for testing or for spinning a Hadoop cluster for a certain job and then terminating it, hence minimizing costs.

Motivation

A couple of years ago I listened to a Spark Summit conference and one company introduced the following architectural solution: data were sitting in S3, when there was the need for analysis, a Hadoop cluster was created, data was pushed to HDFS and analyses were done. After the results were collected, the Hadoop cluster was terminated.

About

The code has no exception handling, it uses AWS’s t2.micro instances to prove the point. There is a lot of potential in building a friendly user interface to parametrize the solution. There is only one input parameter – number of datanodes. When using AWS’s free instances, make sure you do not have more than 20 of them running.

There are four files:

  • HaaS.sh
  • script_namenode.sh
  • script_datanode.sh
  • terminate_cluster.sh

The HaaS.sh file launches the instances for namenode and datanode(s) (namenode instance is dedicated for namenode related services – no datanode services are installed there). It is advised to start at least one datanode. Example on how to launch a cluster with 5 datanodes: . Haas.sh 5

When EC2 instance for namenode is ready, script_namenode.sh is executed on that instance. When EC2 instance(s) for datanode(s) are ready, script_datanode.sh is executed on the instance(s).

Prerequisities

I have defined one instance as “Initial” instance. This is where the scripts are located and this instance creates and terminates the cluster. This instance is not a part of the Hadoop cluster, it launches the cluster and terminates it. I am using Ubuntu 16.04 for all my instances. Make sure you have awscli package installed and aws configured on this initial instance.

Prerequisities on AWS

  • key pair
  • security group
    • open all traffic for all instances in the same subnet and security group
    • open port 9870 for Namenode Web Interface
    • open port 8088 for Resource Manager (YARN)
    • open port 19888 for MapReduce JobHistory server
  • subnet

Times

Launching a Hadoop cluster with 10 datanodes took less than 10 minutes. When testing, I did also come down to 8 minutes. I am using sleep command in the Haas.sh script in order to wait for the instances to either start running or for Hadoop to download and install (unpack). Room for optimization here as well.

Order of execution

The HaaS.sh script does the following actions:

  • launch namenode instance and read output text into a variable
  • parse the variable to collect instance id and private ip
  • create instances.list and add namenode instance id to it
  • append private ip and instance name to /etc/hosts
  • enable passwordless ssh to namenode
  • launch datanode(s)
  • update local /etc/hosts
  • create workers file
  • enable passwordless ssh to datanode(s)
  • start services on datanode(s)
  • copy /etc/hosts from initial instance to all Hadoop instances
  • copy workers file to namenode’s $HADOOP_HOME/etc/hadoop
  • start services on datanode(s)
  • remove temporary files

Link to the scripts can be found here.

Passwordless ssh between two AWS instances

Hadoop clusters require passwordless shh between nodes for proper communication.

This is all done on the instance you wish to connect FROM!

The recipe how I made paswordless shh work between two instances is the following:

  • create ec2 instances – they should be in the same subnet and have the same security group
  • Open ports between them – make sure instances can communicate to each other. Use the default security group which has one rule relevant for this case:
    • Type: All Traffic
    • Source: Custom – id of the security group
  • Log in to the instance you want to connect from to the other instance
  • Run:
    ssh-keygen -t rsa -N "" -f /home/ubuntu/.ssh/id_rsa
    

    to generate a new rsa key.

  • Copy your private AWS key as ~/.ssh/my.key (or whatever name you want to use)
  • Make sure you change the permission to 600
chmod 600 .ssh/my.key
  • Copy the public key to the instance you wish to connect to passwordless
cat ~/.ssh/id_rsa.pub | ssh -i ~/.ssh/my.key ubuntu@10.0.0.X "cat >> ~/.ssh/authorized_keys"

If you test the passwordless ssh to the other machine, it should work.

ssh 10.0.0.X