About Storm’s Nimbus

This post describes Nimbus and shows how its use with single Nimbus in Storm cluster, as well as Nimbus H/A.

I have a Hadoop cluster installed using Ambari. The distribution is Hortonworks. Storm installation with Ambari is described here

A basic example of Storm topology – writing to HDFS can be seen here. Might be smart to submit one topology first in orderto easier understand the terms like Bolt, Supervisor, Nimbus…

About Nimbus

Nimbus is the master node in Storm cluster, it is the NameNode to your Hadoop.

Responsibilities:

  1. distributing code to Supervisors
  2. assigning tasks
  3. monitoring tasks
  4. restarting tasks when needed

Thrift

Thrift is a member of Apache family. It is a software framework (binary protocol) used for scalable cross language communication. Nimbus is a thrift service, and wide use of thrift in Storm allows users to define and submit topologies from any language.
Nimbus thrift API exposes all the information needed to monitor he Storm cluster.

ZooKeeper’s role

Nimbus stores all of its data in ZooKeeper. It is fail-fast (like Supervisor), so if Nimbus dies, the restart has no effect on the running tasks on the Supervisors.

Nimbus and Supervisors communicate through Zookeeper. This means that all data is stored in Zookeeper.

Submitting Topology in Storm Cluster

From the Storm client, the topology is submitted first to the Nimbus and Nimbus distributes it further to the Supervisors.

Single Nimbus in Storm Cluster

The only Nimbus in the Storm cluster is installed on the Hadoop NameNode.
If the Nimbus is not running, Storm UI (on port 8744) returns the following error message

java.lang.RuntimeException: Could not find leader nimbus from seed hosts ["nimbus-server1"]. Did you specify a valid list of nimbus hosts for config nimbus.seeds

Start Nimbus service from Ambari.

The Storm UI, under Nimbus Summary shows one host. It’s default port is 6627 and the status is “Leader”.
I am running one simple test Topology RandomWordsHdfsTopology and the log on the Supervisor executing the Bolt is showing me lines in the following manner:

2016-10-08 11:13:37.885 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Spark] TASK: 4 DELTA:
2016-10-08 11:13:37.986 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Hadoop]
2016-10-08 11:13:37.986 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME:  TUPLE: source: random-words-spout:5, stream: default, id: {}, [Hadoop]
2016-10-08 11:13:37.986 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Hadoop] TASK: 4 DELTA:
2016-10-08 11:13:38.087 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Kafka]
2016-10-08 11:13:38.088 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME:  TUPLE: source: random-words-spout:5, stream: default, id: {}, [Kafka]
2016-10-08 11:13:38.088 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Kafka] TASK: 4 DELTA:
2016-10-08 11:13:38.188 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Storm]
2016-10-08 11:13:38.189 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME: 0 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Storm]

And the random words are being written to a file in HDFS.

If the Nimbus shuts down, Zookeeper and Supervisor continue running the Topology. In this case, the log file on the Supervisor keeps logging random words and the file in HDFS continues to be appended. The Storm UI shows the error message posted above and running

storm list

from the Storm client machine returns the same error message.

Starting the Nimbus again and looking at the $STORM_LOGS/nimbus.log on nimbus-server1 teaches us how Nimbus reacts upon restart.
Some lines taken from the log file:

b.s.zookeeper [INFO] nimbus-server1 gained leadership, checking if it has all the topology code locally.
b.s.zookeeper [INFO] active-topology-ids [RandomWordsHdfsTopology-1-1475917797] local-topology-ids [RandomWordsHdfsTopology-1-1475917797] diff-topology []
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.
b.s.d.nimbus [INFO] Starting Nimbus server...
...
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.

With other words, the active Topology did not suffer from Nimbus downtime. With Nimbus down, nonew Topologies can be submitted and existing ones cannot be manipulated.

Multiple Nimbus in Storm Cluster

Adding another Nimbus for Nimbus High Availability is simple in Ambari.
The second Nimbus is added on the Client node of the cluster. After it is added and the Storm service restarted, the Storm UI, under Nimbus Summary shows two Nimbus hosts one being Leader and one having status “Not a Leader”.

The client-server2, which has “Not a Leader” Nimbus reveals the following lines in the nimbus.log file:

...
b.s.d.nimbus [INFO] not a leader, skipping cleanup-corrupt-topologies
b.s.d.nimbus [INFO] Starting Nimbus server...
b.s.d.nimbus [INFO] not a leader, skipping assignments
b.s.d.nimbus [INFO] not a leader, skipping cleanup
b.s.d.nimbus [INFO] not a leader skipping , credential renweal.
...
b.s.d.nimbus [INFO] missing topology RandomWordsHdfsTopology-1-1475917797 has state on zookeeper but doesn't have a local dir on this host.
...
b.s.d.nimbus [INFO] trying to download missing topology code from NimbusInfo{host='nimbus-server1', port=6627, isLeader=false}

The “Not a Leader” Nimbus is now updated with the Storm CLuster and its topologies. Now the leader Nimbus is stopped:

b.s.zookeeper [INFO] client-server2 gained leadership, checking if it has all the topology code locally.
b.s.zookeeper [INFO] active-topology-ids [RandomWordsHdfsTopology-1-1475917797] local-topology-ids [RandomWordsHdfsTopology-1-1475917797] diff-topology []
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.

The Nimbus on client-server2 takes over as the Leader and Nimbus on the nimbus-server1 has status “Offline”.

When multiple Nimbus services are up and running, the “Leader” status is being switched between them. Roughly, this goes on every couple of minutes.

Nimbus has a vital role in the Storm Cluster and it is naive to think as long as Topology is running, I do not need Nimbus.

Zeppelin thrift error “Can’t get status information “

I have multiple users on one client who are going to use/test ZeppelinR. For every Zeppelin user I create a copy of built Zeppelin folder in user’s home directory. I dedicate a port to that user (8080 is for my testing, running), for example my first user got port 8082. This is done in user’s $ZEPPELIN_HOME/conf/zeppelin-site.xml.

Example for one user:

<property>
  <name>zeppelin.server.port</name>
  <value>8082</value>
  <description>Server port.</description>
</property>

Running Zeppelin as root is not a big problem. Running ZeppelinR as root is also not so problematic. Running it as a normal Linux user can give some challenges.

There is this error message that can surprise you when starting a new Spark context from Zeppelin Web UI.

Taken from Zeppelin log file (zeppelin-user_running_zeppelin-t-client01.log):

ERROR [2016-03-18 08:10:47,401] ({Thread-20} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.run(RemoteScheduler.java:205)
ERROR [2016-03-18 08:11:47,347] ({pool-1-thread-2} RemoteScheduler.java[getStatus]:270) – Can’t get status information
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_getStatus(RemoteInterpreterService.java:355)
at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.getStatus(RemoteInterpreterService.java:342)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobStatusPoller.getStatus(RemoteScheduler.java:256)
at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

The zeppelin out file (zeppelin-user_running_zeppelin-t-client01.out) gives a more concrete description of the problem:

Exception in thread "Thread-80" org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:146)
 at org.apache.zeppelin.interpreter.LazyOpenInterpreter.getScheduler(LazyOpenInterpreter.java:115)
 at org.apache.zeppelin.interpreter.Interpreter.destroy(Interpreter.java:124)
 at org.apache.zeppelin.interpreter.InterpreterGroup$2.run(InterpreterGroup.java:115)
Caused by: java.lang.RuntimeException: Could not find rzeppelin - it must be in either R/lib or ../R/lib
 at org.apache.zeppelin.rinterpreter.RContext$.apply(RContext.scala:353)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext$lzycompute(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.rContext(RInterpreter.scala:43)
 at org.apache.zeppelin.rinterpreter.RInterpreter.getScheduler(RInterpreter.scala:80)
 at org.apache.zeppelin.rinterpreter.RRepl.getScheduler(RRepl.java:93)
 at org.apache.zeppelin.interpreter.ClassloaderInterpreter.getScheduler(ClassloaderInterpreter.java:144)
 ... 3 more

The way I solved it was by running Zeppelin service from the $ZEPPELIN_HOME. For users to be able to start the Zeppelin service I have created a script:

export ZEPPELIN_HOME=/home/${USER}/Zeppelin-With-R
cd ${ZEPPELIN_HOME}
/home/${USER}/Zeppelin-With-R/bin/zeppelin-daemon.sh start

Now I can start and stop the Zeppelin service and start new Spark contexts with no problem.

Here is an example of my YARN applications:

zeppelin services in YARN

And here are the outputs from Zeppelin when scala, sparkR and Hive are tested:

zeppelin user test results