Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Networking in Spark: Configuring ports in Spark

For Spark Context to run, some ports are used. Most of them are randomly chosen which makes it difficult to control them. This post describes how I am controlling Spark’s ports.

In my clusters, some nodes are dedicated client nodes, which means the users can access them, they can store files under their respective home directory (defining home on an attached volume is described here), and run jobs on it.

The Spark jobs can be run in different ways, from different interfaces – Command Line Interface, Zeppelin, RStudio…

 

Links to Spark installation and configuration

Installing Apache Spark 1.6.0 on a multinode cluster

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 in a cluster mode

Building Zeppelin-With-R on Spark and Zeppelin

What Spark Documentation says

Spark UI

Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is

spark.ui.port

When submitting a new Spark Context, 4040 is attempted to be used. If this port is taken, 4041 will be tried, if this one is taken, 4042 is tried and so on, until an available port is found (or maximum attempts are met).
If the attempt is unsuccessful, the log is going to display a WARN and attempt the next port. Example follows:

WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
INFO Utils: Successfully started service ‘SparkUI’ on port 4041.
INFO SparkUI: Started SparkUI at http://client-server:4041

According to the log, the Spark UI is now listening on port 4041.

Not much randomizing for this port. This is not the case for ports in the next chapter.

 

Networking

Looking at the documentation about Networking in Spark 1.6.x, this post is focusing on the 6 properties that have default value random in the following picture:

spark networking.JPG

When Spark Context is in the process of creation these receive random values.

spark.blockManager.port
spark.broadcast.port
spark.driver.port
spark.executor.port
spark.fileserver.port
spark.replClassServer.port

These are the properties that should be controlled. They can be controlled in different ways, depending on how the job is run.

 

Scenarios and solutions

If you do not care about the values assigned to these properties then no further steps are needed..

Configuring ports in spark-defaults.conf

If you are running one Spark application per node (for example: submitting python scripts by using spark-submit), you might want to define the properties in the $SPARK_HOME/conf/spark-defaults.conf. Below is an example of what should be added to the configuration file.

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

If a test is run, for example spark-submit test.py, the Spark UI is by default 4040 and the above mentioned ports are used.

Running the following command

sudo netstat -tulpn | grep 3800

Returns the following output:

tcp6      0      0      :::38000                          :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38002     :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38003     :::*      LISTEN      25300/java
tcp6      0      0      :::38004                          :::*      LISTEN      25300/java
tcp6      0      0      :::38005                          :::*      LISTEN      25300/java

 

Configuring ports directly in a script

In my case, different users would like to use different ways to run Spark applications. Here is an example of how ports are configured through a python script.

"""Pi-estimation.py"""

from random import randint
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

def sample(p):
x, y = randint(0,1), randint(0,1)
print(x)
print(y)
return 1 if x*x + y*y < 1 else 0

conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Pi")

conf.set("spark.ui.port", "4042")

conf.set("spark.blockManager.port", "38020")
conf.set("spark.broadcast.port", "38021")
conf.set("spark.driver.port", "38022")
conf.set("spark.executor.port", "38023")
conf.set("spark.fileserver.port", "38024")
conf.set("spark.replClassServer.port", "38025")

conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)

NUM_SAMPLES = randint(5000000, 100000000)
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print("NUM_SAMPLES is %i" % NUM_SAMPLES)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
(The above Pi estimation is a Spark example that comes with Spark installation)

The property values in the script run over the properties in the spark-defaults.conf file. For the runtime of this script port 4042 and ports 38020-38025 are used.

If netstat command is run again for all ports that start with 380

sudo netstat -tulpn | grep 380

The following output is shown:

tcp6           0           0           :::38000                              :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38002         :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38003         :::*          LISTEN          25300/java
tcp6           0           0           :::38004                              :::*          LISTEN          25300/java
tcp6           0           0           :::38005                              :::*          LISTEN          25300/java
tcp6           0           0           :::38020                              :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38022         :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38023         :::*          LISTEN          27280/java
tcp6           0           0           :::38024                              :::*          LISTEN          27280/java

2 processes are running one separate Spark application each on ports that were defined beforehand.

 

Configuring ports in Zeppelin

Since my users use Apache Zeppelin, similar network management had to be done there. Zeppelin is also sending jobs to Spark Context through spark-submit command. That means that the properties can be configured in the same way. This time through an interpreter in Zeppelin:

Choosing menu Interpreter and choosing spark interpreter will get you there. Now it is all about adding new properties and respective values. Do not forget to click on the plus when you are ready to add a new property.
At the very end, save everything and restart the spark interpreter.

Below is an example of how this is done:

spark zeppelin ports

Next time a Spark context is created in Zeppelin, the ports will be taken into account.

 

Conclusion

This can be useful if multiple users are running Spark applications on one machine and have separate Spark Contexts.

In case of Zeppelin, this comes in handy when one Zeppelin instance is deployed per user.

 

Zeppelin thrift error “Can’t get status information “

I have multiple users on one client who are going to use/test ZeppelinR. For every Zeppelin user I create a copy of built Zeppelin folder in user’s home directory. I dedicate a port to that user (8080 is for my testing, running), for example my first user got port 8082. This is done in user’s $ZEPPELIN_HOME/conf/zeppelin-site.xml.

Example for one user:

<property>
  <name>zeppelin.server.port</name>
  <value>8082</value>
  <description>Server port.</description>
</property>

Running Zeppelin as root is not a big problem. Running ZeppelinR as root is also not so problematic. Running it as a normal Linux user can give some challenges.

There is this error message that can surprise you when starting a new Spark context from Zeppelin Web UI.

Taken from Zeppelin log file (zeppelin-user_running_zeppelin-t-client01.log):

ERROR [2016-03-18 08:10:47,401] ({Thread-20} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.run(RemoteScheduler.java:205)
ERROR [2016-03-18 08:11:47,347] ({pool-1-thread-2} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

The zeppelin out file (zeppelin-user_running_zeppelin-t-client01.out) gives a more concrete description of the problem:

Exception in thread "Thread-80" org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:146)
 at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getScheduler(LazyOpenInterpreter.java:115)
 at org.apache.zeppelin.interpreter.Interpreter.destroy(Interpreter.java:124)
 at org.apache.zeppelin.interpreter.InterpreterGroup$2.run(InterpreterGroup.java:115)
Caused by: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.rinterpreter.RContext$.apply(RContext.scala:353)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext$lzycompute(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.getScheduler(RInterpreter.scala:80)
 at org.apache.zeppelin.rinterpreter.RRepl.getScheduler(RRepl.java:93)
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:144)
 ... 3 more

The way I solved it was by running Zeppelin service from the $ZEPPELIN_HOME. For users to be able to start the Zeppelin service I have created a script:

export ZEPPELIN_HOME=/home/${USER}/Zeppelin-With-R
cd ${ZEPPELIN_HOME}
/home/${USER}/Zeppelin-With-R/bin/zeppelin-daemon.sh start

Now I can start and stop the Zeppelin service and start new Spark contexts with no problem.

Here is an example of my YARN applications:

zeppelin services in YARN

And here are the outputs from Zeppelin when scala, sparkR and Hive are tested:

zeppelin user test results

 

SparkContext allocates random ports. How to control the port allocation.

When SparkContext is in process of creation, a bunch of random ports are allocated to run the Spark service. This can be annoying when you have security groups to think of.

Note!
A more detailed post on this topic is here.

Here is an example of how random ports are allocated when Spark service is started:

spark ports random

The only sure bet is 4040 (or 404x depending on how many Spark Web UI have been already started).

On Apache Spark website, under Configuration, under Networking, 6 port properties have default value random. These are the properties that have to be tamed.

spark port random printscreen

(The only 6 properties with default value random among all Spark properties)

Solution

Open $SPARK_HOME/conf/spark-defaults.conf:

sudo -u spark vi conf/spark-defaults.conf

The following properties should be added:

spark.blockManager.port    38000
spark.broadcast.port       38001
spark.driver.port          38002
spark.executor.port        38003
spark.fileserver.port      38004
spark.replClassServer.port 38005

I have picked port range 38000-38005 for my Spark services.

If I run Spark service now, the ports in use are now as defined in the configuration file:

spark ports tamed