Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Advertisements

SparkR and R – DataFrame and data.frame

I currently work as a Big Data Engineer at the University of St. Gallen. Many researchers work here and are using R to make their research easier. They are familiar with R’s limitations and workarounds.

One of my tasks is introducing SparkR to the researchers. This post gives a short introduction to SparkR and R and clears out any doubt about data frames.

R is a popular tool for statistics and data analysis. It uses dataframes (data.frame), has rich visualization capabilities and many libraries the R community is developing.

The challenge with R is how to make it work on big data; how to use R one huge datasets and on big data clusters.

Spark is a fast and general engine for data processing. It is growing rapidly and has been adopted by many organizations for running faster calculations on big datasets.

SparkR is an R package that provides an interface to use Spark from R. it enables R users to run job on big data clusters with Spark. SparkR API 1.6.0 is available here.

data.frame in R is a list of vectors with equal length.

DataFrame in Spark is a distributed collection of data organized into named columns.

When working with SparkR and R, it is very important to understand that there are two different data frames in question – R data.frame and Spark DataFrame. Proper combination of both is what gets the job done on big data with R.

In practice, the first step is to process the big data using SparkR and its DataFrames. As much as possible is done in this stage (cleaning, filtering, aggregation, various statistical operations).

When the dataset is processed by Spark R it can be collected into an R data.frame. This is done by calling collect() which “transforms” a SparkR DataFrame into an R data.frame. When collect() is called the elements of SparkR DataFrame from all workers are collected and pushed into an R data.frame on the client – where SparkR functionality can be used.

SparkR R flow

Figure 1

Common question many R users ask is “Can I run collect on all my big data and then do R analysis?” The answer is no. If you do that then you are where you were before looking into SparkR – you are doing all your processing, cleaning, wrangling, data science on your client.

Useful installation posts

How to manually install Spark 1.6.0 on a multinode Hadoop cluster is described here:Installing Apache Spark 1.6.0 on a multinode cluster

How to install SparkR on a multinode Hadoop cluster is described here: Installing R on Hadoop cluster to run sparkR

I am testing SparkR and Pyspark in Zeppelin and the Zeppelin installation process is here: Building Zeppelin-With-R on Spark and Zeppelin

Practical examples

Let us see how this works in practice:

I have a file in Hadoop (HDFS), file size is 1.9 GB, it is a CSV file with something over 20 million rows. Looking at the Figure 1, this file is in the blue box.

I run a read.df() command to load the data from the data source into a DataFrame (orange box in Figure 1).

crsp <- read.df(sqlContext, "/tmp/crsp/AllCRSP.csv", source = "com.databricks.spark.csv", inferSchema = "true", header = "true")

Object crsp is a SparkR object, not an R object, which means I can run SparkR commands on it.

If I run str(crsp) command, which is an R command I get the following:

Formal class ‘DataFrame’ [package “SparkR”] with 2 slots
..@ env:<environment: 0x3b44be0>
..@ sdf:Class ‘jobj’ <environment: 0x2aaf5f8>

This does not look familiar to an R user. Data frame crsp is a SparkR DataFrame.

SparkR DataFrame to R data.frame

Since DataFrame crsp has over 20 million rows, I am going to take a small sample to create a new Dataframe for this example:

df_sample <- sample(crsp, withReplacement = False, fraction = 0.0001)

I have created a new DataFrame called df_frame. It has a bit over 2000 rows and SparkR functionality can be used to manipulate it.

I am going to create an R data.frame out of the df_sample DataFrame:

rdf_sample <- collect(df_sample)

I now have two data frames: SparkR DataFrame called df_sample and R data.frame called rdf_sample.

Running str() on both object gives me the following outputs:

str(df_sample)

Formal class ‘DataFrame’ [package “SparkR”] with 2 slots
..@ env:<environment: 0xdb19d40>
..@ sdf:Class ‘jobj’ <environment: 0xd9b4500>

str(rdf_sample)

‘data.frame’: 2086 obs. of 16 variables:
$ PERMNO : int 15580 59248 90562 15553 11499 61241 14219 14868 14761 11157 …
$ Date : chr “20151120” “20111208” “20061213” “20110318” …
$ SICCD : chr “6320” “2082” “2082” “6719” …
$ TICKER : chr “AAME” “TAP” “TAP” “AGL” …
$ NAICS : int 524113 312120 312120 551112 333411 334413 541511 452910 211111 334511 …
$ PERMCO : int 5 33 33 116 176 211 283 332 352 376 …
$ HSICMG : int 63 20 20 49 35 36 73 54 29 38 …
$ BIDLO : num 4.51 40.74 74 38.75 4.03 …
$ ASKHI : num 4.89 41.26 75 39.4 4.11 …
$ PRC : num 4.69 41.01 -74.5 38.89 4.09 …
$ VOL : int 1572 1228200 0 449100 20942 10046911 64258 100 119798 19900 …
$ SHROUT : int 20547 175544 3073 78000 14289 778060 24925 2020 24165 3728 …
$ CFACPR : num 1 1 2 1 1 1 0.2 1 1 1 …
$ CFACSHR: num 1 1 2 1 1 1 0.2 1 1 1 …
$ OPENPRC: num 4.88 41.23 NA 39.07 4.09 …
$ date : chr “20151120” “20111208” “20061213” “20110318” …

Datatype conversion example

Running a SparkR command on the DataFrame:

df_sample$Date <- cast(df_sample$Date, "string")

Is going to convert the datatype from Integer to String.

Running an R command on it will not be effective:

df_sample$Date <- as.character(df_sample$Date)

Output:

Error in as.character.default(df_sample$Date) :
no method for coercing this S4 class to a vector

Since I have converted the DataFrame df_sample to data.frame rdf_sample, I can now put my R hat on and use R functionalities:

rdf_sample$Date <- as.character(rdf_sample$Date)

R data.frame to SparkR DataFrame

In some cases, you have to go the other way – converting an R data.frame to SparkR DataFrame. This is done by using createDataFrame() method

new_df_sample <- createDataFrame(sqlContext, rdf_sample)

If I run str(new_df_sample) I get the following output:

Formal class ‘DataFrame’ [package “SparkR”] with 2 slots
..@ env:<environment: 0xdd9ddb0>
..@ sdf:Class ‘jobj’ <environment: 0xdd99f40>

Conclusion

Knowing which data frame you are about to manipulate saves you a great deal of trouble. It is important to understand what can SparkR do for you so that you can take the best from both worlds.

SparkR is a newer addition to the Spark family. The community is still small so patience is the key ingredient.

Building Zeppelin-With-R on Spark and Zeppelin

For the need of my employeer I am working on setting up different environments for researchers to do their statistical analyses using distributed systems.
In this post, I am going to describe how Zeppelin with R was installed using this github project.

Ubuntu 14.04 Trusty is my Linux flavour. Spark 1.6.0 is manually installed on the cluster (link) on Openstack, Hadoop (2.7.1) distribution is Hortonworks and sparkR has been installed earlier (link).

One of the nodes in the cluster is a dedicated client node. On this node Zeppelin with R is installed.

Prerequisities

  • Spark should be installed (1.6.0 in  this case, my earlier version 1.5.2 also worked well).
  • Java – my version is java version “1.7.0_95”
  • Maven and git (how to install them)
  • User running the Zeppelin service has to have a folder under in HDFS under /user. If the user has, for example, ran Spark earlier, then this folder was created already, otherwise Spark services could not be ran.
    Example on how to create an HDFS folder under /user and change owner:

    sudo -u hdfs hadoop fs -mkdir /user/user1
    sudo -u hdfs hadoop fs -chown user1:user1 /user/user1
    
  • Create zeppelin user
    sudo adduser zeppelin

R installation process

From the shell as root

In order to have ZeppelinR running properly some R packages have to be installed. Installing the R packages has proven to be problematic if some packages are not installed as root user first.

  1. Install Node.js package manager
    sudo apt-get install npm -y
  2. The following packages need to be installed for the R package devtools installation to go through properly.
    sudo apt-get install libcurl4-openssl-dev -y
    sudo apt-get install libxml2-dev -y
  3. Later on, when R package dplyr is being installed, some warnings pop out. Just to be on the safe side these two packages should be installed.
    sudo apt-get install r-cran-rmysql -y
    sudo apt-get install libpq-dev –y
  4. For successfully installing Cairo package in R, the following two should be installed.
    sudo apt-get install libcairo2-dev –y
    sudo apt-get install libxt-dev libxaw7-dev -y
  5. To install IRkernel/repr later the following package needs to be installed.
    sudo apt-get install libzmq3-dev –y

From R as root

  1. Run R as root.
    sudo R
  2. Install the following packages in R:
    install.packages("evaluate", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("base64enc", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("devtools", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("Cairo", dependencies = TRUE, repos='http://cran.us.r-project.org')

    (The reason why I am running one package at a time is to control what is going on when package is being installed)

  3. Load devtools for github command
    library(devtools)
  4. Install IRkernel/repr package
    install_github('IRkernel/repr')
  5. Install these packages
    install.packages("dplyr", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("caret", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("repr", dependencies = TRUE, repos='http://irkernel.github.io/')
  6. Install R interface to Google Charts API
    install.packages('googleVis', dependencies = TRUE, repos='http://cran.us.r-project.org')
  7. Exit R
    q()

Zeppelin installation process

Hortonworks installs its Hadoop under /usr/hdp. I decided to follow the pattern and install Apache services under /usr/apache.

  1. Create log folder for Zeppelin log files
    sudo mkdir /var/log/zeppelin
    sudo chown zeppelin:zeppelin /var/log/zeppelin
  2. Go to /usr/apache (or wherever your home to ZeppelinR is going to be) and clone the github project.
    sudo git clone https://github.com/elbamos/Zeppelin-With-R

    Zeppelin-With-R folder is now created. This is going to be ZeppelinR’s home. In my case this would be /usr/apache/Zeppelin-With-R.

  3. Change the ownership of the folder
    sudo chown –R zeppelin:zeppelin Zeppelin-With-R
  4. Adding global variable ZEPPELIN_HOME
    Open the environment file

    sudo vi /etc/environment

    And add the variable

    export ZEPPELIN_HOME=/usr/apache/Zeppelin-With-R

    Save and exit the file and do not forget to reload it.

    source /etc/environment
  5. Change user to zeppelin (or whoever is going to build the Zeppelin-With-R)
    su zeppelin
  6. Make sure you are in $ZEPPELIN_HOME and build Zeppelin-With R
    mvn clean package -Pspark-1.6 -Dspark.version=1.6.0 -Dhadoop.version=2.7.1 -Phadoop-2.6 -Pyarn -Ppyspark -DskipTests
  7. Initial information before build starts
    zeppelin with r build start
    R interpreter is on the list.
  8. Successful build
    zeppelin with r build end
    ZeppelinR is now installed. The next step is configuration.

Configuring ZeppelinR

  1. Copying and modifying hive-site.xml (as root). From Hive’s con folder, copy the hive-site.conf file to $ZEPPELIN_HOME/conf.
    sudo cp /etc/hive/conf/hive-site.xml $ZEPPELIN_HOME/conf/
  2. Change the owner of the file to zeppelin:zeppelin.
    sudo chown zeppelin:zeppelin $ZEPPELIN_HOME/conf/hive-site.xml
  3. Log in as zeppelin and modify the hive-site.xml file.
    vi $ZEPPELIN_HOME/conf/hive-site.xml

    remove “s” from the value of properties hive.metastore.client.connect.retry.delay and
    hive.metastore.client.socket.timeout to avoid a number format exception.

  4. Create folder for Zeppelin pid.
    mkdir $ZEPPELIN_HOME/run
  5. Create zeppelin-site.xml and zeppelin-env.sh from respective templates.
    cp $ZEPPELIN_HOME/conf/zeppelin-site.xml.template $ZEPPELIN_HOME/conf/zeppelin-site.xml
    cp $ZEPPELIN_HOME/conf/zeppelin-env.sh.template $ZEPPELIN_HOME/conf/zeppelin-env.sh
  6. Open $ZEPPELIN_HOME/conf/zeppelin-env.sh and add:
    export SPARK_HOME=/usr/apache/spark-1.6.0-bin-hadoop2.6
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export ZEPPELIN_JAVA_OPTS= -Dhdp.version=2.3.4.0-3485
    export ZEPPELIN_LOG_DIR=/var/log/zeppelin
    export ZEPPELIN_PID_DIR=${ZEPPELIN_HOME}/run

    Parameter Dhdp.version should match your Hortonworks distribution version.
    Save and exit the file.

Zeppelin-With-R is now ready for use. Start it by running

$ZEPPELIN_HOME/bin/zeppelin-daemon.sh start

How to configure Zeppelin interpreters is described in this post.

 

Note!
My experience shows that if you run ZeppelinR as zeppelin user, you will not be able to use spark.r functionalities. The error message I am getting is unable to start device X11cairo. The reason is lack of permissions on certain files within R installation. This is something I still have to figure out. For now running as root does the trick.

http//:zeppelin-server:8080 takes you to the Zeppelin Web UI. How to configure interpreters Spark and Hive is described in this post.

When interpreters are configures, a notebook RInterpreter is available for test.

 

Installing R on Hadoop cluster to run sparkR

The environment

Ubuntu 14.04 Trusty is the operating system. Hortonworks Hadoop distribution is used to install the multinode cluster.

The following process of installation has been successful for the following versions:

  • Spark 1.4.1
  • Spark 1.5.2
  • Spark 1.6.0

Prerequisities

This post assumes Spark is already installed on the system. How this can be done is explained in one of my posts here.

Setting up sparkR

If command sparkR ($SPARK_HOME/bin/sparkR) is run right after Spark installation is complete, the following error is returned:

env: R: No such file or directory

R packages have to be installed on all nodes in order for sparkR to work properly. Spark 1.6 Technical Preview on Hortonworks website gives us the link on how to set up R on Linux (Installing R on Linux). I experienced the process to be different.

Again, this process should be done on all nodes.

  1. The Ubuntu archives on CRAN are signed with the key of “Michael Rutter marutter@gmail.com” with key ID E084DAB9 (link)
    sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E084DAB9
  2. Now add the key to apt.
    gpg -a --export E084DAB9 | sudo apt-key add -
  3. Fetch Linux codename
    LVERSION=`lsb_release -c | cut -c 11-`

    In this case, it is trusty.

  4. From Cran – Mirrors or CRAN Mirrors US, find a suitable CRAN mirror and use it in the next step. In this case, a CRAN mirror from Austria is used – cran.wu.ac.at.
  5. Append the repository to the system’s sources.list file
    echo deb https://cran.wu.ac.at/bin/linux/ubuntu $LVERSION/ | sudo tee -a /etc/apt/sources.list
    

    The line added to the sources.list should look something like this:

    deb https://cran.wu.ac.at/bin/linux/ubuntu trusty/

  6. Update the system
    sudo apt-get update
  7. Install r-base package
    sudo apt-get install r-base -y
  8. Install r-base-dev package
    sudo apt-get install r-base-dev -y
  9. In order to avoid the following warn when Spark service is started:

    WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

    Add the following export to the system environment file:

    export LD_LIBRARY_PATH=/usr/hdp/current/hadoop-client/lib/native
  10. Test the installation by starting R
    R

    Something like this should show up and R should start.

    R version 3.2.3 (2015-12-10) — “Wooden Christmas-Tree”

  11. Exit R.
    q()

R is now installed on the first node. Steps 1 to 7 should be repeated on all nodes in the cluster.

Testing sparkR

If R is installed on all nodes (I haven’t mentioned that yet in this post), we can test it from the node where Spark client is installed:

cd $SPARK_HOME
./bin/sparkR

Hello message from Spark invites the user to the sparkR environment.

sparkR-hello-window

Output when starting SparkR

R version 3.2.3 (2015-12-10) — “Wooden Christmas-Tree”
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type ‘license()’ or ‘licence()’ for distribution details.

Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type ‘contributors()’ for more information and
‘citation()’ on how to cite R or R packages in publications.

Type ‘demo()’ for some demos, ‘help()’ for on-line help, or
‘help.start()’ for an HTML browser interface to help.
Type ‘q()’ to quit R.

Launching java with spark-submit command /usr/hdp/2.3.4.0-3485/spark/bin/spark-submit “sparkr-shell” /tmp/RtmpN1gWPL/backend_port18f039dadb86
16/02/22 22:12:40 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:41 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:41 INFO SparkContext: Running Spark version 1.5.2
16/02/22 22:12:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
16/02/22 22:12:41 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:41 INFO SecurityManager: Changing view acls to: ubuntu
16/02/22 22:12:41 INFO SecurityManager: Changing modify acls to: ubuntu
16/02/22 22:12:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); users with modify permissions: Set(ubuntu)
16/02/22 22:12:42 INFO Slf4jLogger: Slf4jLogger started
16/02/22 22:12:42 INFO Remoting: Starting remoting
16/02/22 22:12:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.x.xxx.xxx:59112]
16/02/22 22:12:42 INFO Utils: Successfully started service ‘sparkDriver’ on port 59112.
16/02/22 22:12:42 INFO SparkEnv: Registering MapOutputTracker
16/02/22 22:12:42 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:42 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:42 INFO SparkEnv: Registering BlockManagerMaster
16/02/22 22:12:42 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-aeca25e3-dc7e-4750-95c6-9a21c6bc60fd
16/02/22 22:12:42 INFO MemoryStore: MemoryStore started with capacity 530.0 MB
16/02/22 22:12:42 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:42 INFO HttpFileServer: HTTP File server directory is /tmp/spark-009d52a3-de7e-4fa1-bea3-97f3ef34ebbe/httpd-c9182234-dec7-438d-8636-b77930ed5f62
16/02/22 22:12:42 INFO HttpServer: Starting HTTP Server
16/02/22 22:12:42 INFO Server: jetty-8.y.z-SNAPSHOT
16/02/22 22:12:42 INFO AbstractConnector: Started SocketConnector@0.0.0.0:51091
16/02/22 22:12:42 INFO Utils: Successfully started service ‘HTTP file server’ on port 51091.
16/02/22 22:12:42 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/22 22:12:43 INFO Server: jetty-8.y.z-SNAPSHOT
16/02/22 22:12:43 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/02/22 22:12:43 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.
16/02/22 22:12:43 INFO SparkUI: Started SparkUI at http://10.x.x.108:4040
16/02/22 22:12:43 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:43 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:43 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.
16/02/22 22:12:43 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/02/22 22:12:43 INFO Executor: Starting executor ID driver on host localhost
16/02/22 22:12:43 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService’ on port 41193.
16/02/22 22:12:43 INFO NettyBlockTransferService: Server created on 41193
16/02/22 22:12:43 INFO BlockManagerMaster: Trying to register BlockManager
16/02/22 22:12:43 INFO BlockManagerMasterEndpoint: Registering block manager localhost:41193 with 530.0 MB RAM, BlockManagerId(driver, localhost, 41193)
16/02/22 22:12:43 INFO BlockManagerMaster: Registered BlockManager
16/02/22 22:12:43 WARN SparkConf: The configuration key ‘spark.yarn.applicationMaster.waitTries’ has been deprecated as of Spark 1.3 and and may be removed in the future. Please use the new key ‘spark.yarn.am.waitTime’ instead.

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.2
/_/
Spark context is available as sc, SQL context is available as sqlContext
>

 

If it happens you do not get the prompt displayed:

16/02/22 22:52:12 INFO ShutdownHookManager: Shutdown hook called
16/02/22 22:52:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-009d52a3-de7e-4fa1-bea3-97f3ef34ebbe

Just press Enter.

 

Possible errors

No public key

W: GPG error: https://cran.wu.ac.at trusty/ Release: The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 51716619E084DAB9

Step 1 was not performed.

Hash Sum mismatch

When updating Ubuntu the following error message shows up:

Hash Sum mismatch

Solution:

sudo rm -rf /var/lib/apt/lists/*
sudo apt-get clean
sudo apt-get update

Resources

1 – Spark 1.6 Technical Preview

2- Installing R on Linux

3 – CRAN – Mirrors

4 – CRAN Secure APT

Installing Spark on Hortonworks cluster using Ambari

The environment

Ubuntu Trusty 14.04. Ambari is used to install the cluster. MySql is used for storing Ambari’s metadata.
Spark is installed on a client node.

 

Note!

My experience with administrating Spark from Ambari has made me install Spark manually, not from Ambari and not by using Hortonworks packages. I install Apache Spark manually on a client node – described here.

Some reasons for that are:

  • New Spark version available every quarter – Hortonworks does not keep up
  • Possibility of running different Spark version on the same client
  • Better control over configuration files
  • Custom definition of Spark parameters for running multiple Spark context on the same client node (more in this post).

 

Installation process in Ambari

Hortonworks distribution installed using Ambari. Hortonworks version 2.3.4.
Services installed first: HDFS, MapReduce, YARN, Ambari Metrics, Zookeeper – I prefer to install these first in order to test if the bare minimum is up and running.

In the next step, Hive, Tez and Pig are installed.

After the successful installation, Spark is installed.

Spark versions

Now, Spark is installed. Hortonworks distribution 2.3.4 offers Spark 1.4.1 from the Choose Services menu:

ambari-spark-version

Running command spark-shell from the spark server reveals that 1.5.2 was installed:

spark-152-version-logo
and
sc-version

Spark’s HOME

Spark’s home directory ($SPARK_HOME) is /usr/hdp/current/spark-client. It is smart to export $SPARK_HOME since it is refered to in services that build on top of Spark.
Spark’s conf directory ($SPARK_CONF_DIR) is /usr/hdp/current/spark-client/conf.

Folder current has nothing but links to the Hortonworks version installed. This means that /usr/hdp/current/spark-client is linked to /usr/hdp/2.3.4.0-3485/spark/.

Comments on the installation

Spark installation from Ambari has, among other things, created a linux user spark and a directory on HDFS – /user/spark.

Spark commands, that were installed, are the following:
spark-class, spark-shell, spark-sql, spark-submit – these can be called from anywhere, since they are linked in /usr/bin.
Other spark commands not linked to the /usr/bin but can be executed from $SPARK_HOME/bin are beeline, pyspark, sparkR.

Connection to Hive

In the SPARK_CONF_DIR, hive-site.xml file can be found. The file has the following content:

<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://hive-server:9083</value>
  </property>
</configuration>

With this propery, Spark connects to Hive. here are two lines from the output when command spark-shell is executed:

16/02/22 13:52:37 INFO metastore: Trying to connect to metastore with URI thrift://hive-server:9083
16/02/22 13:52:37 INFO metastore: Connected to metastore.

Logs

Spark’s log files are by default in /var/log/spark. This can be changed in Ambari: Spark-> Configs -> Advanced spark-env for property spark_log_dir.

Running Spark commands

Examples on how to execute the Spark commands (taken from Hortonworks Spark 1.6 Technical Preview).
These should be run as spark user from $SPARK_HOME.

spark-shell

spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

spark-submit

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10

sparkR

running sparkR ($SPARK_HOME/bin/sparkR) returns the following:

env: R: No such file or directory

R is not installed, yet. How to install R environment is described here.

Resources

Spark 1.6 Technical Preview