Networking in Spark: Configuring ports in Spark

For Spark Context to run, some ports are used. Most of them are randomly chosen which makes it difficult to control them. This post describes how I am controlling Spark’s ports.

In my clusters, some nodes are dedicated client nodes, which means the users can access them, they can store files under their respective home directory (defining home on an attached volume is described here), and run jobs on it.

The Spark jobs can be run in different ways, from different interfaces – Command Line Interface, Zeppelin, RStudio…

 

Links to Spark installation and configuration

Installing Apache Spark 1.6.0 on a multinode cluster

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 in a cluster mode

Building Zeppelin-With-R on Spark and Zeppelin

What Spark Documentation says

Spark UI

Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is

spark.ui.port

When submitting a new Spark Context, 4040 is attempted to be used. If this port is taken, 4041 will be tried, if this one is taken, 4042 is tried and so on, until an available port is found (or maximum attempts are met).
If the attempt is unsuccessful, the log is going to display a WARN and attempt the next port. Example follows:

WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
INFO Utils: Successfully started service ‘SparkUI’ on port 4041.
INFO SparkUI: Started SparkUI at http://client-server:4041

According to the log, the Spark UI is now listening on port 4041.

Not much randomizing for this port. This is not the case for ports in the next chapter.

 

Networking

Looking at the documentation about Networking in Spark 1.6.x, this post is focusing on the 6 properties that have default value random in the following picture:

spark networking.JPG

When Spark Context is in the process of creation these receive random values.

spark.blockManager.port
spark.broadcast.port
spark.driver.port
spark.executor.port
spark.fileserver.port
spark.replClassServer.port

These are the properties that should be controlled. They can be controlled in different ways, depending on how the job is run.

 

Scenarios and solutions

If you do not care about the values assigned to these properties then no further steps are needed..

Configuring ports in spark-defaults.conf

If you are running one Spark application per node (for example: submitting python scripts by using spark-submit), you might want to define the properties in the $SPARK_HOME/conf/spark-defaults.conf. Below is an example of what should be added to the configuration file.

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

If a test is run, for example spark-submit test.py, the Spark UI is by default 4040 and the above mentioned ports are used.

Running the following command

sudo netstat -tulpn | grep 3800

Returns the following output:

tcp6      0      0      :::38000                          :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38002     :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38003     :::*      LISTEN      25300/java
tcp6      0      0      :::38004                          :::*      LISTEN      25300/java
tcp6      0      0      :::38005                          :::*      LISTEN      25300/java

 

Configuring ports directly in a script

In my case, different users would like to use different ways to run Spark applications. Here is an example of how ports are configured through a python script.

"""Pi-estimation.py"""

from random import randint
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

def sample(p):
x, y = randint(0,1), randint(0,1)
print(x)
print(y)
return 1 if x*x + y*y < 1 else 0

conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Pi")

conf.set("spark.ui.port", "4042")

conf.set("spark.blockManager.port", "38020")
conf.set("spark.broadcast.port", "38021")
conf.set("spark.driver.port", "38022")
conf.set("spark.executor.port", "38023")
conf.set("spark.fileserver.port", "38024")
conf.set("spark.replClassServer.port", "38025")

conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)

NUM_SAMPLES = randint(5000000, 100000000)
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print("NUM_SAMPLES is %i" % NUM_SAMPLES)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
(The above Pi estimation is a Spark example that comes with Spark installation)

The property values in the script run over the properties in the spark-defaults.conf file. For the runtime of this script port 4042 and ports 38020-38025 are used.

If netstat command is run again for all ports that start with 380

sudo netstat -tulpn | grep 380

The following output is shown:

tcp6           0           0           :::38000                              :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38002         :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38003         :::*          LISTEN          25300/java
tcp6           0           0           :::38004                              :::*          LISTEN          25300/java
tcp6           0           0           :::38005                              :::*          LISTEN          25300/java
tcp6           0           0           :::38020                              :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38022         :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38023         :::*          LISTEN          27280/java
tcp6           0           0           :::38024                              :::*          LISTEN          27280/java

2 processes are running one separate Spark application each on ports that were defined beforehand.

 

Configuring ports in Zeppelin

Since my users use Apache Zeppelin, similar network management had to be done there. Zeppelin is also sending jobs to Spark Context through spark-submit command. That means that the properties can be configured in the same way. This time through an interpreter in Zeppelin:

Choosing menu Interpreter and choosing spark interpreter will get you there. Now it is all about adding new properties and respective values. Do not forget to click on the plus when you are ready to add a new property.
At the very end, save everything and restart the spark interpreter.

Below is an example of how this is done:

spark zeppelin ports

Next time a Spark context is created in Zeppelin, the ports will be taken into account.

 

Conclusion

This can be useful if multiple users are running Spark applications on one machine and have separate Spark Contexts.

In case of Zeppelin, this comes in handy when one Zeppelin instance is deployed per user.

 

Zeppelin thrift error “Can’t get status information “

I have multiple users on one client who are going to use/test ZeppelinR. For every Zeppelin user I create a copy of built Zeppelin folder in user’s home directory. I dedicate a port to that user (8080 is for my testing, running), for example my first user got port 8082. This is done in user’s $ZEPPELIN_HOME/conf/zeppelin-site.xml.

Example for one user:

<property>
  <name>zeppelin.server.port</name>
  <value>8082</value>
  <description>Server port.</description>
</property>

Running Zeppelin as root is not a big problem. Running ZeppelinR as root is also not so problematic. Running it as a normal Linux user can give some challenges.

There is this error message that can surprise you when starting a new Spark context from Zeppelin Web UI.

Taken from Zeppelin log file (zeppelin-user_running_zeppelin-t-client01.log):

ERROR [2016-03-18 08:10:47,401] ({Thread-20} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.run(RemoteScheduler.java:205)
ERROR [2016-03-18 08:11:47,347] ({pool-1-thread-2} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

The zeppelin out file (zeppelin-user_running_zeppelin-t-client01.out) gives a more concrete description of the problem:

Exception in thread "Thread-80" org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:146)
 at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getScheduler(LazyOpenInterpreter.java:115)
 at org.apache.zeppelin.interpreter.Interpreter.destroy(Interpreter.java:124)
 at org.apache.zeppelin.interpreter.InterpreterGroup$2.run(InterpreterGroup.java:115)
Caused by: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.rinterpreter.RContext$.apply(RContext.scala:353)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext$lzycompute(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.getScheduler(RInterpreter.scala:80)
 at org.apache.zeppelin.rinterpreter.RRepl.getScheduler(RRepl.java:93)
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:144)
 ... 3 more

The way I solved it was by running Zeppelin service from the $ZEPPELIN_HOME. For users to be able to start the Zeppelin service I have created a script:

export ZEPPELIN_HOME=/home/${USER}/Zeppelin-With-R
cd ${ZEPPELIN_HOME}
/home/${USER}/Zeppelin-With-R/bin/zeppelin-daemon.sh start

Now I can start and stop the Zeppelin service and start new Spark contexts with no problem.

Here is an example of my YARN applications:

zeppelin services in YARN

And here are the outputs from Zeppelin when scala, sparkR and Hive are tested:

zeppelin user test results

 

Building Zeppelin-With-R on Spark and Zeppelin

For the need of my employeer I am working on setting up different environments for researchers to do their statistical analyses using distributed systems.
In this post, I am going to describe how Zeppelin with R was installed using this github project.

Ubuntu 14.04 Trusty is my Linux flavour. Spark 1.6.0 is manually installed on the cluster (link) on Openstack, Hadoop (2.7.1) distribution is Hortonworks and sparkR has been installed earlier (link).

One of the nodes in the cluster is a dedicated client node. On this node Zeppelin with R is installed.

Prerequisities

  • Spark should be installed (1.6.0 in  this case, my earlier version 1.5.2 also worked well).
  • Java – my version is java version “1.7.0_95”
  • Maven and git (how to install them)
  • User running the Zeppelin service has to have a folder under in HDFS under /user. If the user has, for example, ran Spark earlier, then this folder was created already, otherwise Spark services could not be ran.
    Example on how to create an HDFS folder under /user and change owner:

    sudo -u hdfs hadoop fs -mkdir /user/user1
    sudo -u hdfs hadoop fs -chown user1:user1 /user/user1
    
  • Create zeppelin user
    sudo adduser zeppelin

R installation process

From the shell as root

In order to have ZeppelinR running properly some R packages have to be installed. Installing the R packages has proven to be problematic if some packages are not installed as root user first.

  1. Install Node.js package manager
    sudo apt-get install npm -y
  2. The following packages need to be installed for the R package devtools installation to go through properly.
    sudo apt-get install libcurl4-openssl-dev -y
    sudo apt-get install libxml2-dev -y
  3. Later on, when R package dplyr is being installed, some warnings pop out. Just to be on the safe side these two packages should be installed.
    sudo apt-get install r-cran-rmysql -y
    sudo apt-get install libpq-dev –y
  4. For successfully installing Cairo package in R, the following two should be installed.
    sudo apt-get install libcairo2-dev –y
    sudo apt-get install libxt-dev libxaw7-dev -y
  5. To install IRkernel/repr later the following package needs to be installed.
    sudo apt-get install libzmq3-dev –y

From R as root

  1. Run R as root.
    sudo R
  2. Install the following packages in R:
    install.packages("evaluate", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("base64enc", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("devtools", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("Cairo", dependencies = TRUE, repos='http://cran.us.r-project.org')

    (The reason why I am running one package at a time is to control what is going on when package is being installed)

  3. Load devtools for github command
    library(devtools)
  4. Install IRkernel/repr package
    install_github('IRkernel/repr')
  5. Install these packages
    install.packages("dplyr", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("caret", dependencies = TRUE, repos='http://cran.us.r-project.org')
    install.packages("repr", dependencies = TRUE, repos='http://irkernel.github.io/')
  6. Install R interface to Google Charts API
    install.packages('googleVis', dependencies = TRUE, repos='http://cran.us.r-project.org')
  7. Exit R
    q()

Zeppelin installation process

Hortonworks installs its Hadoop under /usr/hdp. I decided to follow the pattern and install Apache services under /usr/apache.

  1. Create log folder for Zeppelin log files
    sudo mkdir /var/log/zeppelin
    sudo chown zeppelin:zeppelin /var/log/zeppelin
  2. Go to /usr/apache (or wherever your home to ZeppelinR is going to be) and clone the github project.
    sudo git clone https://github.com/elbamos/Zeppelin-With-R

    Zeppelin-With-R folder is now created. This is going to be ZeppelinR’s home. In my case this would be /usr/apache/Zeppelin-With-R.

  3. Change the ownership of the folder
    sudo chown –R zeppelin:zeppelin Zeppelin-With-R
  4. Adding global variable ZEPPELIN_HOME
    Open the environment file

    sudo vi /etc/environment

    And add the variable

    export ZEPPELIN_HOME=/usr/apache/Zeppelin-With-R

    Save and exit the file and do not forget to reload it.

    source /etc/environment
  5. Change user to zeppelin (or whoever is going to build the Zeppelin-With-R)
    su zeppelin
  6. Make sure you are in $ZEPPELIN_HOME and build Zeppelin-With R
    mvn clean package -Pspark-1.6 -Dspark.version=1.6.0 -Dhadoop.version=2.7.1 -Phadoop-2.6 -Pyarn -Ppyspark -DskipTests
  7. Initial information before build starts
    zeppelin with r build start
    R interpreter is on the list.
  8. Successful build
    zeppelin with r build end
    ZeppelinR is now installed. The next step is configuration.

Configuring ZeppelinR

  1. Copying and modifying hive-site.xml (as root). From Hive’s con folder, copy the hive-site.conf file to $ZEPPELIN_HOME/conf.
    sudo cp /etc/hive/conf/hive-site.xml $ZEPPELIN_HOME/conf/
  2. Change the owner of the file to zeppelin:zeppelin.
    sudo chown zeppelin:zeppelin $ZEPPELIN_HOME/conf/hive-site.xml
  3. Log in as zeppelin and modify the hive-site.xml file.
    vi $ZEPPELIN_HOME/conf/hive-site.xml

    remove “s” from the value of properties hive.metastore.client.connect.retry.delay and
    hive.metastore.client.socket.timeout to avoid a number format exception.

  4. Create folder for Zeppelin pid.
    mkdir $ZEPPELIN_HOME/run
  5. Create zeppelin-site.xml and zeppelin-env.sh from respective templates.
    cp $ZEPPELIN_HOME/conf/zeppelin-site.xml.template $ZEPPELIN_HOME/conf/zeppelin-site.xml
    cp $ZEPPELIN_HOME/conf/zeppelin-env.sh.template $ZEPPELIN_HOME/conf/zeppelin-env.sh
  6. Open $ZEPPELIN_HOME/conf/zeppelin-env.sh and add:
    export SPARK_HOME=/usr/apache/spark-1.6.0-bin-hadoop2.6
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export ZEPPELIN_JAVA_OPTS= -Dhdp.version=2.3.4.0-3485
    export ZEPPELIN_LOG_DIR=/var/log/zeppelin
    export ZEPPELIN_PID_DIR=${ZEPPELIN_HOME}/run

    Parameter Dhdp.version should match your Hortonworks distribution version.
    Save and exit the file.

Zeppelin-With-R is now ready for use. Start it by running

$ZEPPELIN_HOME/bin/zeppelin-daemon.sh start

How to configure Zeppelin interpreters is described in this post.

 

Note!
My experience shows that if you run ZeppelinR as zeppelin user, you will not be able to use spark.r functionalities. The error message I am getting is unable to start device X11cairo. The reason is lack of permissions on certain files within R installation. This is something I still have to figure out. For now running as root does the trick.

http//:zeppelin-server:8080 takes you to the Zeppelin Web UI. How to configure interpreters Spark and Hive is described in this post.

When interpreters are configures, a notebook RInterpreter is available for test.

 

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 & 1.6.0 in a cluster mode

I have Ubuntu 14.04 Trusty and a multinode Hadoop cluster. Hadoop distribution is Hortonworks 2.3.4. Spark is installed through Ambari Web UI and running version is 1.5.2 (upgraded to 1.6.0).

I am going to explain how I built and set up Apache Zeppelin 0.6.0 on Spark 1.5.2 and 1.6.0

Prerequisities

Non root account

Apache Zeppelin creators recommend not to use root account. For this service, I have created a new user zeppelin.

Java 7

Zeppelin uses Java 7. My system has Java 8, so I have installed Java 7 just for Zeppelin. Installation is in the following directory done as user zeppelin.

/home/zeppelin/prerequisities/jdk1.7.0_79

JAVA_HOME is added to the user’s bashrc.

export JAVA_HOME=/home/zeppelin/prerequisities/jdk1.7.0_79

Zeppelin log directory

Create zeppelin log directory.

sudo mkdir /var/log/zeppelin

Change ownership.

sudo chown zeppelin:zeppelin /var/log/zeppelin

If this is not done, Zeppelin’s log files are written in folder logs right in the current folder.

Clone and Build

Log in as user zeppelin and go to users home directory.

/home/zeppelin

Clone the source code from github.

git clone https://github.com/apache/incubator-zeppelin.git incubator-zeppelin

Zeppelin has a home now.

/home/zeppelin/incubator-zeppelin

Go into Zeppelin home and build Zeppelin

mvn clean package -Pspark-1.5 -Dspark.version=1.5.2 -Dhadoop.version=2.7.1 -Phadoop-2.6 -Pyarn -DskipTests

Build order.

zeppelin build start

7:31 minutes later, Zeppelin is successfully built.

zeppelin build success

Note!

If you try with something like the following 2 examples:

mvn clean package -Pspark-1.5 -Dspark.version=1.5.0 -Dhadoop.version=2.7.1 -Phadoop-2.7 -Pyarn -DskipTests
mvn clean package -Pspark-1.5 -Dspark.version=1.5.2 -Dhadoop.version=2.7.1 -Phadoop-2.7 -Pyarn –DskipTests

Build will succeed, but this warning will appear at the bottom of Build report:

[WARNING] The requested profile “hadoop-2.7” could not be activated because it does not exist.

Hadoop version mentioned in the maven execution must be 2.6 even though actual Hadoop version is 2.7.x.

hive-site.xml

Copy hive-site.xml from hive folder (this is done on Hortonworks distribution, users using other distribution should check where the file is located).

sudo cp /etc/hive/conf/hive-site.xml $ZEPPELIN_HOME/conf

Change ownership of the file.

sudo chown zeppelin:zeppelin $ZEPPELIN_HOME/conf/hive-site.xml

zeppelin-env.sh

Go to Zeppelin home and create zeppelin-env.sh by using the template in conf directory.

cp conf/zeppelin-env.sh.template conf/zeppelin-env.sh

Open it and add the following variables:

export JAVA_HOME=/home/zeppelin/prerequisities/jdk1.7.0_79
export HADOOP_CONF_DIR=/etc/hadoop/conf
export ZEPPELIN_JAVA_OPTS="-Dhdp.version=2.3.4.0-3485"
export ZEPPELIN_LOG_DIR=/var/log/zeppelin

The variable in the third line depends on the Hortonworks build. Find your hdp version by executing

hdp-select status hadoop-client

If your Hortonworks version is 2.3.4, the output is:

hadoop-client – 2.3.4.0-3485

Zeppelin daemon

Start Zeppelin from Zeppelin home

./bin/zeppelin-daemon.sh start

Status after starting the daemon:

zeppelin start

One can check if service is up:

./bin/zeppelin-daemon.sh status

Status:

zeppelin status

Zeppelin can be restarted in the following way:

./bin/zeppelin-daemon.sh restart

Status:

zeppelin restart

Stopping Zeppelin:

./bin/zeppelin-daemon.sh stop

Status:

zeppelin stop

Configuring interpreters in Zeppelin

Apache Zeppelin comes with many default interpreters. It is also possible to create your own interpreters. How to configure default Spark and Hive interpreters is covered in this post.