My Work cluster in detail

The cluster was built on OpenStack private cloud owned by a Swiss organization Switch.

The Hadoop distributor was Hortonworks, except for Spark and Zeppelin, who were Apache’s.

Potential users

Since the project owner was an organization supporting educational entities in Switzerland, the potential users were researchers, scientists, students…

I had the luxury of having almost unlimited resources on the infrastructure so I have built 5 Hadoop clusters – 4 were Hortonworks Hadoop clusters, one was Apache Hadoop cluster. Out of the 4, one was the Work environment which was exposed to the end users. And this is the cluster that is described in detail in this post.
Keep in mind that I was working on my own on this development – which meant administering and upgrading 5 clusters and doing data science at the same time. In order to make it work, I had to use the YARN inside me and distribute the limited resources effectively.

Initial resources

Keeping in mind the point of distributed systems is scalability, I have defined the initial cluster with the following capabilities.
6 instances with corresponding details:

  • Ambari Server
  • NameNode
  • DataNode (3)
  • Client
Instance RAM VCPU Default disk size Volume No. Volume Size Security group
Ambari 8GB 8 VCPU 20GB None None sg-ambari
NameNode 32GB 8 VCPU 20GB 1 200GB sg-namenode
DataNode (3) 32GB 8 VCPU 20GB 3 200GB sg-datanode
Client 16GB 16 VCPU 20GB 1 500GB sg-client

Note: There were three DataNodes in the initial cluster.

Characteristics of the cluster

The initial cluster had 1.7 TB HDFS, replication factor was 3, block size was the default 128MB. Rack awareness was not set in the initial cluster and the queue was the default.
On the YARN side, I have made some changes and had 84GB RAM (3 x 32GM = 96GB RAM. 4GB per DataNode was left for services on the instance -> 96GB – 12GB = 84GB) as maximum amount of RAM resources for the cluster – the default values by Apache (Hortonworks?) are quite more conservative.

In the cluster building process the versions were Ambari 2.1 and HDP 2.3. When Ambari 2.2 and HDP 2.4 were available, the cluster was upgraded.

Ambari

Ambari had a server for itself, the database for collecting statistics was MySql. The idea was always to migrate the Ambari Server if needed. The migration to new Ambari server is easy so I could afford to start small for this service.
The Ambari Views was enabled for the users who wanted to upload the files to the HDFS manually. Hive was also available through this service and I on my one of my test environments, I have even embedded Zeppelin in Ambari Views. Though, on the Work cluster, Zeppelin was offered only as an independent service on the Client.
All the ports for Ambari to properly work were in the sg-ambari security group.

NameNode

The initial plan with the NameNode was to run all the services on it except Spark and Zeppelin. When the resources would expand beyond the instance’s capabilities, some services would be moved to a new instance, or unused services would be stopped (experience showed Hive had little popularity among the academia). Using Ambari, migrating services is an easy process, I could afford to have all services running on one NameNode. Only cluster administrator had access to this instance. With other words, client tools were not installed on this instance.
All the ports for the NameNode to properly work were in the sg-namenode security group.

DataNode

I started with 3 DataNodes, which offered 1.7TB of storage on the HDFS. The DataNodes were also used as Workers for Spark and Supervisors for Storm. The users had no access to the DataNodes directly – no client was installed here. This would change according to the needs so that some jobs could access data directly locally.
All the ports for the DataNodes to properly work were in the sg-datanode security group.

Client

The client was users’ window to the cluster. Spark 2.0 (before Summer 2016 it was Spark 1.6) was offered as the computational engine – only one. One reason was also easier administration and optimizatoin from my side.
The users could use the command line interface (CLI), RStudio or Zeppelin. Ambari Views as well, but that was running on Ambari instance. More advanced users went with the CLI, users who wanted to learn Spark were using Zeppelin.
Client for Storm was also installed on this instance. Due to more complex programming (in Java), all the Topologies were handled by me, the users were defining requirements and using the data stored by the Storm.
All the ports for the Client to properly work were in the sg-datanode security group.

See below for page 2.

Error “org.apache.hive.hcatalog.data.JsonSerDe not found” while accessing Hive tables from spark-sql

In the Hadoop cluster, I am running Spark 2.0 on a Client node, separately from Hive services.

If I would like to connect to Hive Metastore using spark-sql, hive-hcatalog-core-0.13.0.jar has to be added to the jars folder in Spark home directory.

Step into $SPARK_HOME/jars folder and run the following

sudo -u spark wget http://central.maven.org/maven2/org/apache/hive/hcatalog/hive-hcatalog-core/0.13.0/hive-hcatalog-core-0.13.0.jar

Now I can run spark-sql and queries on the tables in the databases.

If the jar file is missing, and by running for example

desc table_name

the following error message is displayed

ERROR hive.log: error in initSerDe: java.lang.ClassNotFoundException Class org.apache.hive.hcatalog.data.JsonSerDe not found
java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:385)
        at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
        at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:258)
        at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:339)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$8.apply(HiveClientImpl.scala:335)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:335)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:333)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:262)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:209)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:208)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:251)
        at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:333)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$tableExists$1.apply(HiveExternalCatalog.scala:228)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
        at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:227)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:456)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:126)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:274)
        at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:414)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:682)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:331)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:247)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Since all my clusters have Hive services running on separate nodes than Spark services, I would not know if this is needed in cases when Hive services and Spark services are on the same node.

Preparing a client instance to be added to the cluster

In this post I describe how to prepare a Client node with with attached volume.

The Client and the volume are created through the WebUI on the OpenStack private cloud. The volume is attached to the Client in the WebUI, as well.
The volume is attached to device like this:

/user – /dev/vdb

After the new “soon-to-be” Client instance has been created and volume attached there is some work to be done in the command line interface:

Connect to the new client
Since the client is going to be exposed, there is a public IP address available for it.
Another option is to connect from another instance in the cluster (make sure to update /etc/hosts on all instances in the cluster).

ssh -i .ssh/key client-instance

Update, upgrade and reboot the system.

sudo apt-get update -y && sudo apt-get upgrade -y && sudo reboot

Create the directory where the data for the volume will be stored.

sudo mkdir -p /user

Format file system for the volume to be attached.

sudo mkfs.ext4 /dev/vdb

Mount the volume to the respective directory.

sudo mount /dev/vdb /user

Label the volume for easier future work.

sudo e2label /dev/vdb "user"

Open and update /etc/fstab.
This is smart to do to keep the volumes mounted to the directories after the Client is restarted.

LABEL=user /user ext4 defaults,nobootwait 0 0

Check if volumes are mounted to correct directories.

df -h

Something like this should appear:

/dev/vdb        197G        60M         187G    1%      /user

For future reference, you can check the size of all monuted folders under directory /user.

sudo du -hs /user

Something similar to this should be in the output.

20K /user

Next steps

Adding client to the cluster

Now the Client with added volume is ready to be added to the cluster. This is, to some extent, described in Adding new DataNode to the cluster using Ambari. Although the post describes how to add a DataNode, adding Client is the same except that in step Assign Slaves and Clients, the Client checkbox should be checked.

Configuring home directory

The users working on the client should have their home directory under /user. How this is achieved is described in Configuring home directory on client nodes.

Networking in Spark: Configuring ports in Spark

For Spark Context to run, some ports are used. Most of them are randomly chosen which makes it difficult to control them. This post describes how I am controlling Spark’s ports.

In my clusters, some nodes are dedicated client nodes, which means the users can access them, they can store files under their respective home directory (defining home on an attached volume is described here), and run jobs on it.

The Spark jobs can be run in different ways, from different interfaces – Command Line Interface, Zeppelin, RStudio…

 

Links to Spark installation and configuration

Installing Apache Spark 1.6.0 on a multinode cluster

Building Apache Zeppelin 0.6.0 on Spark 1.5.2 in a cluster mode

Building Zeppelin-With-R on Spark and Zeppelin

What Spark Documentation says

Spark UI

Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is

spark.ui.port

When submitting a new Spark Context, 4040 is attempted to be used. If this port is taken, 4041 will be tried, if this one is taken, 4042 is tried and so on, until an available port is found (or maximum attempts are met).
If the attempt is unsuccessful, the log is going to display a WARN and attempt the next port. Example follows:

WARN Utils: Service ‘SparkUI’ could not bind on port 4040. Attempting port 4041.
INFO Utils: Successfully started service ‘SparkUI’ on port 4041.
INFO SparkUI: Started SparkUI at http://client-server:4041

According to the log, the Spark UI is now listening on port 4041.

Not much randomizing for this port. This is not the case for ports in the next chapter.

 

Networking

Looking at the documentation about Networking in Spark 1.6.x, this post is focusing on the 6 properties that have default value random in the following picture:

spark networking.JPG

When Spark Context is in the process of creation these receive random values.

spark.blockManager.port
spark.broadcast.port
spark.driver.port
spark.executor.port
spark.fileserver.port
spark.replClassServer.port

These are the properties that should be controlled. They can be controlled in different ways, depending on how the job is run.

 

Scenarios and solutions

If you do not care about the values assigned to these properties then no further steps are needed..

Configuring ports in spark-defaults.conf

If you are running one Spark application per node (for example: submitting python scripts by using spark-submit), you might want to define the properties in the $SPARK_HOME/conf/spark-defaults.conf. Below is an example of what should be added to the configuration file.

spark.blockManager.port 38000
spark.broadcast.port 38001
spark.driver.port 38002
spark.executor.port 38003
spark.fileserver.port 38004
spark.replClassServer.port 38005

If a test is run, for example spark-submit test.py, the Spark UI is by default 4040 and the above mentioned ports are used.

Running the following command

sudo netstat -tulpn | grep 3800

Returns the following output:

tcp6      0      0      :::38000                          :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38002     :::*      LISTEN      25300/java
tcp6      0      0      10.0.173.225:38003     :::*      LISTEN      25300/java
tcp6      0      0      :::38004                          :::*      LISTEN      25300/java
tcp6      0      0      :::38005                          :::*      LISTEN      25300/java

 

Configuring ports directly in a script

In my case, different users would like to use different ways to run Spark applications. Here is an example of how ports are configured through a python script.

"""Pi-estimation.py"""

from random import randint
from pyspark.context import SparkContext
from pyspark.conf import SparkConf

def sample(p):
x, y = randint(0,1), randint(0,1)
print(x)
print(y)
return 1 if x*x + y*y < 1 else 0

conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Pi")

conf.set("spark.ui.port", "4042")

conf.set("spark.blockManager.port", "38020")
conf.set("spark.broadcast.port", "38021")
conf.set("spark.driver.port", "38022")
conf.set("spark.executor.port", "38023")
conf.set("spark.fileserver.port", "38024")
conf.set("spark.replClassServer.port", "38025")

conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

sc = SparkContext(conf=conf)

NUM_SAMPLES = randint(5000000, 100000000)
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print("NUM_SAMPLES is %i" % NUM_SAMPLES)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
(The above Pi estimation is a Spark example that comes with Spark installation)

The property values in the script run over the properties in the spark-defaults.conf file. For the runtime of this script port 4042 and ports 38020-38025 are used.

If netstat command is run again for all ports that start with 380

sudo netstat -tulpn | grep 380

The following output is shown:

tcp6           0           0           :::38000                              :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38002         :::*          LISTEN          25300/java
tcp6           0           0           10.0.173.225:38003         :::*          LISTEN          25300/java
tcp6           0           0           :::38004                              :::*          LISTEN          25300/java
tcp6           0           0           :::38005                              :::*          LISTEN          25300/java
tcp6           0           0           :::38020                              :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38022         :::*          LISTEN          27280/java
tcp6           0           0           10.0.173.225:38023         :::*          LISTEN          27280/java
tcp6           0           0           :::38024                              :::*          LISTEN          27280/java

2 processes are running one separate Spark application each on ports that were defined beforehand.

 

Configuring ports in Zeppelin

Since my users use Apache Zeppelin, similar network management had to be done there. Zeppelin is also sending jobs to Spark Context through spark-submit command. That means that the properties can be configured in the same way. This time through an interpreter in Zeppelin:

Choosing menu Interpreter and choosing spark interpreter will get you there. Now it is all about adding new properties and respective values. Do not forget to click on the plus when you are ready to add a new property.
At the very end, save everything and restart the spark interpreter.

Below is an example of how this is done:

spark zeppelin ports

Next time a Spark context is created in Zeppelin, the ports will be taken into account.

 

Conclusion

This can be useful if multiple users are running Spark applications on one machine and have separate Spark Contexts.

In case of Zeppelin, this comes in handy when one Zeppelin instance is deployed per user.

 

Configuring home directory on client nodes

My cluster has dedicated nodes that take a role of a client. Users can connect to these clients with their own private keys and they have their own home directory.
Until now, the home directory was the default /home/username. Lately, Ambari has been warning me that client nodes are running out of disk.
I have added one volume to the client node and configured a new home for the users. How I am attaching volume to an instance is described here.

The new volume is mounted to folder /user. I am trying to imitate Hadoop’s filesystem. Since some users already exist and have their home directories under /home, I have to take care of them first.

 

Create /user folder

Create the folder where users’ home will reside

sudo mkdir /user

Move existing user

Moving existing user to a new home directory:

sudo usermod -m -d /user/test1 test1

If the new directory does not exist yet, usermod is going to create it.

After the command is finished, the directory /home/test1 is not available anymore and folder /user/test1 has become the new home directory for user1.

Explaining options*:

-m     move contents of the home directory to the new location (use only with -d)
-d       new home directory for the user account

*-taken from usermod man.

Now we can check some details of the user with the following command:

finger test1

The following output is given:

Login: test1 Name:
Directory: /user/test1 Shell: /bin/bash
Never logged in.
No mail.
No Plan.

Note:
If you run the usermod command on a user that is logged in, usermod is going to report an error:

usermod: user user2 is currently used by process 13995

Running ps aux on this process

ps aux | grep 13995

Reveals that the user user2 is logged in:

user2    13995  0.0  0.0 103568  2468 ?        S    11:59   0:00 sshd: user2@pts/2

 

Changing configuration for new users

Opening adduser.conf file

sudo vi /etc/adduser.conf

and changing (first uncommenting if needed!) the value for parameter DHOME does the trick:

DHOME=/user

Upon creation of a new user

sudo adduser newuser

this new user now has a new home directory on the attached volume. When logging in as newuser and running command pwd, the output is

/user/newuser

 
The existing users’ directories have now been moved to /user which is a dedicated volume for user files. The configuration file /etc/adduser.conf has been altered so that the new users are automatically defined as having home directory under /user.