Spark, Scala, sbt and S3

The idea behind this blog post is to write a Spark application in Scala, build the project with sbt and run the application which reads from a simple text file in S3.

If you are interested in how to access S3 files from Apache Spark with Ansible, check out this post.

A couple of Spark and Scala functionalities that can be picked up in this post:

  • how to create SparkSession
  • how to create SparkContext
  • auxiliary constructors
  • submitting Spark application with arguments
  • error handling in Spark
  • adding column to Spark DataFrame

My operating system is Windows 10 with Spark 2.4.0 installed on it. Spark has been installed using this tutorial. I have not installed Scala on my machine. I do have Java, version 8. Keep in mind Spark does not support Java 9. It is ok to have multiple versions installed, just make sure you make the switch to Java 8 in the IDE.

My IDE of choice is Intellij IDEA Community 2019.1. It is possible to run the code test, import libraries (using sbt), package JAR file and run the JAR from the IDE.

I have a small single node Spark cluster in AWS (one instance type t2.micro) for testing the JAR file outside of my development environment. This instance is accessed, and the project tested from mobaXterm.

Create application

Create new Scala project

create project

Type in name of the project and change the JDK path to Java 8 if default points to some other version. Spark 2.4.0 is using Scala 2.11.12 so make sure the Scala version matches. This can be changed later in the sbt file.

create project_2

IntelliJ IDEA creates the project and the structure of it is as below image:

project structure

Spark libraries are imported using the sbt file. Copy and paste the following line to the sbt file:

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3"

In the lower right corner will there will be a pop-up window asking you if you want to import changes. Accept that.

import changes

Reading files in Spark

A rookie mistake is to create a file locally, run it in the development environment, which is your own computer, where it works as it should, and then scale-up to a cluster where the file cannot be found. It is easy to forget that the file should be in the same location on all workers in the Spark cluster not just on the instance that servers as the client!

To avoid this, an external storage is introduced. In the case of this post, it is the object storage of AWS – S3.

The test file I am using here is a simple one-line txt file:

a;b;c

If you plan to use other file structure make sure to change the schema definition in the code.

Creating Scala class

The environment, input file and the sbt file are now ready. Let us write the example. Create new Scala class.

file new project.jpg

Make sure to choose Object from the Kind dropdown menu.

new scala class

Open Test.scala and copy the following content in it:

import scala.util.Failure
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.lit

// primary constructor
class TextAnalysis(path : String, comment : String) {

//aux cons takes no inputs
def this() {
  this("s3a://hdp-hive-s3/test.txt", "no params")
}

//aux cons takes file path as input
def this(path : String) {
  this(path, "from s3a")
}

  //create spark session
  private val spark = SparkSession
    .builder
    .master("local")
    .appName("kaggle")
    .getOrCreate()

  //Create a SparkContext to initialize Spark
  private val sc = spark.sparkContext

  def ReadFile(): Unit = {

    val testSchema = StructType(Array(
      StructField("first", StringType, true),
      StructField("second", StringType, true),
      StructField("third", StringType, true)))

    try {

      val df = spark.read
        .format("csv")
        .option("delimiter", ";")
        .schema(testSchema)
        .load(path)

      val df1 = df.withColumn("comment", lit(comment))
      df1.show()

    } catch {
      case e: AnalysisException => {
        println(s"FILE $path NOT FOUND... EXITING...")
      }
      case unknown: Exception => {
        println("UNKNOWN EXCEPTION... EXITING...")
        println(Failure(unknown))
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    val sep = ";"
    val argsArray = args.mkString(sep).split(sep)

    val ta = new TextAnalysis("s3a://hdp-hive-s3/test.txt")
    ta.ReadFile()

    val ta1 = new TextAnalysis(argsArray(0), argsArray(1))
    ta1.ReadFile()

    val ta2 = new TextAnalysis()
    ta2.ReadFile()
  }
}

In the object Test, three instances of the TextAnalysis class are created to demonstrate how to run  Spark code with parameters as input arguments and without.

In order to make the second instance work the arguments need to be defined. Click Run -> Edit Configurations…

In textbox Program arguments write the following “s3a://hdp-hive-s3/test.txt;with args”. Click OK.

Right click on the code and click on the Run “Test” from the menu. This should generate a verbose log which should also give 3 prints of the test tables. Here is an example of one

table output example

The column “comment” is added in the Spark the code.

Package application in a JAR

Create artifact configuration

Open the Project Structure windows and choose Artifacts from the left menu: File -> Project Structure -> Artifacts.

Click on the +, choose JAR and From modules with dependencies.

Enter Module and Main Class manually or use the menus on the right. Click OK when done and click OK again to exit the window.

create jar from modules

Build JAR

Open the menu Build -> Build Artifacts. Choose the JAR you wish to build and choose action Build.

build artifact - build.JPG

This will create a new folder called “out” where the JAR file resides.

out folder

The test.jar is over 130 MB big. And 70 lines of code were written. All dependencies were packaged in this JAR file which can sometimes be acceptable but in majority of cases the dependencies are already on the server. And the same goes for this case. The JARs used to run this application are already inside the standard Spark cluster under $SPARK_HOME/jars. For working with S3 files the JAR file hadoop-aws should be added to the jars folder. The JAR can be found here.

Removing the dependent JARs is done in the following way from Build -> Build Artifacts.

build artifact - edit.JPG

In the window, on the right side of it, remove all the JAR files under the test.jar – mark them and click icon “minus”.

project structure - removed dependencies

After saving the changes, rebuild the artifact using Build -> Build Artifacts and the Rebuild option from the menu. This should reduce the file size to 4 KB.

Testing the package on Spark single-node

If you have a working Spark cluster you can copy the JAR file to it and test it. I have a single node Spark cluster in AWS for this purpose and mobaXterm to connect to the cluster. From the folder where the JAR file has been copied run the following command:

sh $SPARK_HOME/bin/spark-submit --class Test test.jar "s3a://hdp-hive-s3/test.txt;test on Spark"

Three tables should be seen in the output, among the log outputs, one of the tables is the following

table output on Spark cluster

This table shows how calling a Scala class with arguments from the command line works.

This wraps up the example of how Spark, Scala, sbt and S3 work together.

Apache Spark 2.0 – Notes

Spark Session

In Spark 2.0, SparkSession has been introduced. It provides a single point of entry for interaction with Spark functionality. It allows user accessing DataFrame and Dataset APIs.
In older Spark versions, the user had to create Spark configuration (sparkConf), SparkContext (sc) and then sqlContext. In Spark 2.0 all is done with SparkSession (spark), which encapsulates the above mentioned trio, HiveContext and StreamingContext.

In order to use DataFrame API in Spark 1.6, SQLContext needs to be used. When running Spark, new Spark application is started by creating SparkContext object (represents a connection to computing cluster). From SparkContext, SQLContext can be created (the main entry point for Spark DataFrame and SQL functionality). A SQLContext can be used to create DataFrames, which allows you to direct operations on your data.
It was confusing when to use SparkContext and when to use SQLContext in Spark 1.6. All this is hidden under a layer called SparkSession. See the following object types from PySpark driver:

SparkSession

>>> type(spark)
<class 'pyspark.sql.session.SparkSession'>

SparkContext

>>> type(sc)
<class 'pyspark.context.SparkContext'>

SqlContext

>>> type(sqlContext)
<class 'pyspark.sql.context.SQLContext'>

The Spark session has to be created when using spark-submit command. An example from the documentation on how to do that:

>>> spark = SparkSession.builder \
 |  ...     .master("local") \
 |  ...     .appName("Word Count") \
 |  ...     .config("spark.some.config.option", "some-value") \
 |  ...     .getOrCreate()

The spark handle is created and first DataFrames can be created.

When using local driver programs (pyspark, spark-sql, spark-shell or sparkR), the SparkSession is automatically initialized. Example with sparkR:

>>> sparkR

sparkr-initialization

SQL

SQL in Spark 2.0 supports SQL:2003 (latest revision is SQL:2011). Spark SQL can run all 99 TPC-DS queries. Subquery support has been improved. More on the SQL improvements here.

Spark SQL

Spark SQL can be accessed through SparkSession. A table can be created and SQL queries can be executed against it. Example:

myDF.createOrReplaceTempView("my_table")
resultDF = spark.sql("SELECT col1, col2 from my_table")

Running the following command

>>> spark.catalog.listTables()

Returns the tables available to SparkSession.

Driver program

In Spark, communication occurs between driver and executors. The driver has Spark jobs to run, it splits them into tasks and submits them to executors for completion. The results are delivered back to the driver.

Every Spark application has a driver program which launches various parallel operations on executor JVMs. The JVMs are running either in a cluster or locally on the same machine. Pyspark is an example of a local driver program. Example:

pyspark --master yarn --deploy-mode cluster

Error: Cluster deploy mode is not applicable to Spark shells.

If you run in cluster mode that means that the client that submitted the job is detached from the Spark application and its further behavior does not influence the application. If you shut down the computer that submitted the application to the cluster, the job will continue to run. The driver is on one of the nodes in Spark cluster. Command spark-submit with property –deploy-mode cluster does this.
If you run in client mode, that means that the client you are running the application from is the client. In Spark HistoryServer, under tab Executors, in table Executors, you can read that the address of the driver matches the address of the computer the command has been sent from.

The driver program creates distributed datasets on the cluster and applies operations to those datasets. Driver programs access Spark through a SparkContext object.

Listing attributes

Pythons dir() lists all attributes accessible through the parameter
Example for spark:

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_createFromLocal', '_createFromRDD', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

Help

Python’s help() lists attributes and examples. Example:

>>> help(spark)

Opens extensive help and examples.

RDD

SQLContext is created from lower level SparkContext. SparkContext is used to create Resilient Distributed Datasets (RDDs). RDD is Spark’s way of representing data internally. DataFrames are implemented in terms of RDDs.
It is possible to interact directly with RDDs, but DataFrames are preferred. They are faster and perform no matter of the language you use (Python, R, Scala, Java). Whether you express your computations in Spark SQL or Python, Java, Scala, or R, the underlying code generated is identical because all execution planning undergoes the same Catalyst optimizer.
DFs are made of partitions – converting a DF to an RDD to check number of partitions:

>>> tweetDF.rdd.getNumPartitions()

Data is split into partitions.
How to optimize:
If there are 3 slots, perfect is to have partitions = x*3
Repartition DF:

>>> tweetDF.repartition(6)

DataFrames

In Spark, base DataFrame is first created. Either by generating a Dataset using spark.range method (for learning purposes) or by reading file(s) or tables and returning a DataFrame. Operations can then be applied to it. DataFrame is immutable, once created, it cannot be changed. Each transformation creates a new DataFrame. In the end, one or more actions can be applied to the DataFrame.

DataFrame consists of series of Row objects. Each of them has a set of named columns.

DataFrame must have a schema, each of which has a name and a type. Some datasources have schemas built into them, although it is possible to define a schema and introduce it as a parameter when creating a new DataFrame.

Running:

help(spark.createDataFrame)

Returns:

createDataFrame(self, data, schema=None, samplingRatio=None) method of pyspark.sql.session.SparkSession instance

Dataset

DataFrame and DataSet are unified in Scala and Java. In Python and R, DataFrame is the main interface.

Test in Scala:

scala> import org.apache.spark.sql._
scala> classOf[DataFrame] == classOf[Dataset[_]]
res5: Boolean = true

Checking SQL package in Spark 2.0

package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}

Pandas DataFrame

Spark DF can be converted to Pandas DF

>>> import pandas as pd

Then you can use .toPandas() at the end of the Spark DF to convert to Pandas DF.

Cache

Putting DataFrame in memory: Spark uses Tungsten binary format to columnar compress data in memory. The number of partitions in memory is equal to the number of partitions defined on the RDD under the DataFrame. The data in memory is equally divided per partition (for example, 600MB in memory, 6 partitions -> 100MB per partition).
The size of data shrinks when cached. Example: 1,6GB file on disk is cached in memory with size 618,6 MB.

When an action is executed, and if the DataFrame is cached, the stages BEFORE the cache was executed are skipped, because the data is already partitioned in memory.
Ideal partition size is between 100MB to 200MB, so number of partitions should be adjusted to that, not the other way around.

Cache is not an action, which means it will be executed when the next action is executed. However, if you cache a table, it WILL be cached right away.

Sources

How to use SparkSession in Apache Spark 2.0
Using Apache Spark 2.0 to Analyze the City of San Francisco’s Open Data
Modern Spark DataFrame and Dataset (Intermediate Tutorial)
Spark SQL, DataFrames and Datasets Guide

Networking in Spark: Configuring ports in Spark

For Spark Context to run, some ports are used. Most of them are randomly chosen which makes it difficult to control them. This post describes how I am controlling Spark’s ports.

In my clusters, some nodes are dedicated client nodes, which means the users can access them, they can store files under their respective home directory (defining home on an attached volume is described here), and run jobs on it.

The Spark jobs can be run in different ways, from different interfaces – Command Line Interface, Zeppelin, RStudio…

 

Links to Spark installation and configuration

Installing Apache Spark 1.6.0 on a multinode cluster

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 in a cluster mode

Building Zeppelin-With-R on Spark and Zeppelin

What Spark Documentation says

Spark UI

Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is

spark.ui.port

When submitting a new Spark Context, 4040 is attempted to be used. If this port is taken, 4041 will be tried, if this one is taken, 4042 is tried and so on, until an available port is found (or maximum attempts are met).
If the attempt is unsuccessful, the log is going to display a WARN and attempt the next port. Example follows:

WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
INFO Utils: Successfully started service ‘SparkUI’ on port 4041.
INFO SparkUI: Started SparkUI at http://client-server:4041

According to the log, the Spark UI is now listening on port 4041.

Not much randomizing for this port. This is not the case for ports in the next chapter.

 

Networking

Looking at the documentation about Networking in Spark 1.6.x, this post is focusing on the 6 properties that have default value random in the following picture:

spark networking.JPG

When Spark Context is in the process of creation these receive random values.

spark.blockManager.port
spark.broadcast.port
spark.driver.port
spark.executor.port
spark.fileserver.port
spark.replClassServer.port

These are the properties that should be controlled. They can be controlled in different ways, depending on how the job is run.

 

Scenarios and solutions

If you do not care about the values assigned to these properties then no further steps are needed..

Configuring ports in spark-defaults.conf

If you are running one Spark application per node (for example: submitting python scripts by using spark-submit), you might want to define the properties in the $SPARK_HOME/conf/spark-defaults.conf. Below is an example of what should be added to the configuration file.

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

If a test is run, for example spark-submit test.py, the Spark UI is by default 4040 and the above mentioned ports are used.

Running the following command

sudo netstat -tulpn | grep 3800

Returns the following output:

tcp6      0      0      :::38000                          :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38002     :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38003     :::*      LISTEN      25300/java
tcp6      0      0      :::38004                          :::*      LISTEN      25300/java
tcp6      0      0      :::38005                          :::*      LISTEN      25300/java

 

Configuring ports directly in a script

In my case, different users would like to use different ways to run Spark applications. Here is an example of how ports are configured through a python script.

"""Pi-estimation.py"""

from random import randint
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

def sample(p):
x, y = randint(0,1), randint(0,1)
print(x)
print(y)
return 1 if x*x + y*y < 1 else 0

conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Pi")

conf.set("spark.ui.port", "4042")

conf.set("spark.blockManager.port", "38020")
conf.set("spark.broadcast.port", "38021")
conf.set("spark.driver.port", "38022")
conf.set("spark.executor.port", "38023")
conf.set("spark.fileserver.port", "38024")
conf.set("spark.replClassServer.port", "38025")

conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)

NUM_SAMPLES = randint(5000000, 100000000)
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print("NUM_SAMPLES is %i" % NUM_SAMPLES)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
(The above Pi estimation is a Spark example that comes with Spark installation)

The property values in the script run over the properties in the spark-defaults.conf file. For the runtime of this script port 4042 and ports 38020-38025 are used.

If netstat command is run again for all ports that start with 380

sudo netstat -tulpn | grep 380

The following output is shown:

tcp6           0           0           :::38000                              :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38002         :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38003         :::*          LISTEN          25300/java
tcp6           0           0           :::38004                              :::*          LISTEN          25300/java
tcp6           0           0           :::38005                              :::*          LISTEN          25300/java
tcp6           0           0           :::38020                              :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38022         :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38023         :::*          LISTEN          27280/java
tcp6           0           0           :::38024                              :::*          LISTEN          27280/java

2 processes are running one separate Spark application each on ports that were defined beforehand.

 

Configuring ports in Zeppelin

Since my users use Apache Zeppelin, similar network management had to be done there. Zeppelin is also sending jobs to Spark Context through spark-submit command. That means that the properties can be configured in the same way. This time through an interpreter in Zeppelin:

Choosing menu Interpreter and choosing spark interpreter will get you there. Now it is all about adding new properties and respective values. Do not forget to click on the plus when you are ready to add a new property.
At the very end, save everything and restart the spark interpreter.

Below is an example of how this is done:

spark zeppelin ports

Next time a Spark context is created in Zeppelin, the ports will be taken into account.

 

Conclusion

This can be useful if multiple users are running Spark applications on one machine and have separate Spark Contexts.

In case of Zeppelin, this comes in handy when one Zeppelin instance is deployed per user.

 

Zeppelin thrift error “Can’t get status information “

I have multiple users on one client who are going to use/test ZeppelinR. For every Zeppelin user I create a copy of built Zeppelin folder in user’s home directory. I dedicate a port to that user (8080 is for my testing, running), for example my first user got port 8082. This is done in user’s $ZEPPELIN_HOME/conf/zeppelin-site.xml.

Example for one user:

<property>
  <name>zeppelin.server.port</name>
  <value>8082</value>
  <description>Server port.</description>
</property>

Running Zeppelin as root is not a big problem. Running ZeppelinR as root is also not so problematic. Running it as a normal Linux user can give some challenges.

There is this error message that can surprise you when starting a new Spark context from Zeppelin Web UI.

Taken from Zeppelin log file (zeppelin-user_running_zeppelin-t-client01.log):

ERROR [2016-03-18 08:10:47,401] ({Thread-20} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.run(RemoteScheduler.java:205)
ERROR [2016-03-18 08:11:47,347] ({pool-1-thread-2} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

The zeppelin out file (zeppelin-user_running_zeppelin-t-client01.out) gives a more concrete description of the problem:

Exception in thread "Thread-80" org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:146)
 at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getScheduler(LazyOpenInterpreter.java:115)
 at org.apache.zeppelin.interpreter.Interpreter.destroy(Interpreter.java:124)
 at org.apache.zeppelin.interpreter.InterpreterGroup$2.run(InterpreterGroup.java:115)
Caused by: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.rinterpreter.RContext$.apply(RContext.scala:353)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext$lzycompute(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.getScheduler(RInterpreter.scala:80)
 at org.apache.zeppelin.rinterpreter.RRepl.getScheduler(RRepl.java:93)
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:144)
 ... 3 more

The way I solved it was by running Zeppelin service from the $ZEPPELIN_HOME. For users to be able to start the Zeppelin service I have created a script:

export ZEPPELIN_HOME=/home/${USER}/Zeppelin-With-R
cd ${ZEPPELIN_HOME}
/home/${USER}/Zeppelin-With-R/bin/zeppelin-daemon.sh start

Now I can start and stop the Zeppelin service and start new Spark contexts with no problem.

Here is an example of my YARN applications:

zeppelin services in YARN

And here are the outputs from Zeppelin when scala, sparkR and Hive are tested:

zeppelin user test results

 

SparkContext allocates random ports. How to control the port allocation.

When SparkContext is in process of creation, a bunch of random ports are allocated to run the Spark service. This can be annoying when you have security groups to think of.

Note!
A more detailed post on this topic is here.

Here is an example of how random ports are allocated when Spark service is started:

spark ports random

The only sure bet is 4040 (or 404x depending on how many Spark Web UI have been already started).

On Apache Spark website, under Configuration, under Networking, 6 port properties have default value random. These are the properties that have to be tamed.

spark port random printscreen

(The only 6 properties with default value random among all Spark properties)

Solution

Open $SPARK_HOME/conf/spark-defaults.conf:

sudo -u spark vi conf/spark-defaults.conf

The following properties should be added:

spark.blockManager.port    38000
spark.broadcast.port       38001
spark.driver.port          38002
spark.executor.port        38003
spark.fileserver.port      38004
spark.replClassServer.port 38005

I have picked port range 38000-38005 for my Spark services.

If I run Spark service now, the ports in use are now as defined in the configuration file:

spark ports tamed