My Work cluster in detail

The cluster was built on OpenStack private cloud owned by a Swiss organization Switch.

The Hadoop distributor was Hortonworks, except for Spark and Zeppelin, who were Apache’s.

Potential users

Since the project owner was an organization supporting educational entities in Switzerland, the potential users were researchers, scientists, students…

I had the luxury of having almost unlimited resources on the infrastructure so I have built 5 Hadoop clusters – 4 were Hortonworks Hadoop clusters, one was Apache Hadoop cluster. Out of the 4, one was the Work environment which was exposed to the end users. And this is the cluster that is described in detail in this post.
Keep in mind that I was working on my own on this development – which meant administering and upgrading 5 clusters and doing data science at the same time. In order to make it work, I had to use the YARN inside me and distribute the limited resources effectively.

Initial resources

Keeping in mind the point of distributed systems is scalability, I have defined the initial cluster with the following capabilities.
6 instances with corresponding details:

  • Ambari Server
  • NameNode
  • DataNode (3)
  • Client
Instance RAM VCPU Default disk size Volume No. Volume Size Security group
Ambari 8GB 8 VCPU 20GB None None sg-ambari
NameNode 32GB 8 VCPU 20GB 1 200GB sg-namenode
DataNode (3) 32GB 8 VCPU 20GB 3 200GB sg-datanode
Client 16GB 16 VCPU 20GB 1 500GB sg-client

Note: There were three DataNodes in the initial cluster.

Characteristics of the cluster

The initial cluster had 1.7 TB HDFS, replication factor was 3, block size was the default 128MB. Rack awareness was not set in the initial cluster and the queue was the default.
On the YARN side, I have made some changes and had 84GB RAM (3 x 32GM = 96GB RAM. 4GB per DataNode was left for services on the instance -> 96GB – 12GB = 84GB) as maximum amount of RAM resources for the cluster – the default values by Apache (Hortonworks?) are quite more conservative.

In the cluster building process the versions were Ambari 2.1 and HDP 2.3. When Ambari 2.2 and HDP 2.4 were available, the cluster was upgraded.

Ambari

Ambari had a server for itself, the database for collecting statistics was MySql. The idea was always to migrate the Ambari Server if needed. The migration to new Ambari server is easy so I could afford to start small for this service.
The Ambari Views was enabled for the users who wanted to upload the files to the HDFS manually. Hive was also available through this service and I on my one of my test environments, I have even embedded Zeppelin in Ambari Views. Though, on the Work cluster, Zeppelin was offered only as an independent service on the Client.
All the ports for Ambari to properly work were in the sg-ambari security group.

NameNode

The initial plan with the NameNode was to run all the services on it except Spark and Zeppelin. When the resources would expand beyond the instance’s capabilities, some services would be moved to a new instance, or unused services would be stopped (experience showed Hive had little popularity among the academia). Using Ambari, migrating services is an easy process, I could afford to have all services running on one NameNode. Only cluster administrator had access to this instance. With other words, client tools were not installed on this instance.
All the ports for the NameNode to properly work were in the sg-namenode security group.

DataNode

I started with 3 DataNodes, which offered 1.7TB of storage on the HDFS. The DataNodes were also used as Workers for Spark and Supervisors for Storm. The users had no access to the DataNodes directly – no client was installed here. This would change according to the needs so that some jobs could access data directly locally.
All the ports for the DataNodes to properly work were in the sg-datanode security group.

Client

The client was users’ window to the cluster. Spark 2.0 (before Summer 2016 it was Spark 1.6) was offered as the computational engine – only one. One reason was also easier administration and optimizatoin from my side.
The users could use the command line interface (CLI), RStudio or Zeppelin. Ambari Views as well, but that was running on Ambari instance. More advanced users went with the CLI, users who wanted to learn Spark were using Zeppelin.
Client for Storm was also installed on this instance. Due to more complex programming (in Java), all the Topologies were handled by me, the users were defining requirements and using the data stored by the Storm.
All the ports for the Client to properly work were in the sg-datanode security group.

See below for page 2.

About Storm’s Nimbus

This post describes Nimbus and shows how its use with single Nimbus in Storm cluster, as well as Nimbus H/A.

I have a Hadoop cluster installed using Ambari. The distribution is Hortonworks. Storm installation with Ambari is described here

A basic example of Storm topology – writing to HDFS can be seen here. Might be smart to submit one topology first in orderto easier understand the terms like Bolt, Supervisor, Nimbus…

About Nimbus

Nimbus is the master node in Storm cluster, it is the NameNode to your Hadoop.

Responsibilities:

  1. distributing code to Supervisors
  2. assigning tasks
  3. monitoring tasks
  4. restarting tasks when needed

Thrift

Thrift is a member of Apache family. It is a software framework (binary protocol) used for scalable cross language communication. Nimbus is a thrift service, and wide use of thrift in Storm allows users to define and submit topologies from any language.
Nimbus thrift API exposes all the information needed to monitor he Storm cluster.

ZooKeeper’s role

Nimbus stores all of its data in ZooKeeper. It is fail-fast (like Supervisor), so if Nimbus dies, the restart has no effect on the running tasks on the Supervisors.

Nimbus and Supervisors communicate through Zookeeper. This means that all data is stored in Zookeeper.

Submitting Topology in Storm Cluster

From the Storm client, the topology is submitted first to the Nimbus and Nimbus distributes it further to the Supervisors.

Single Nimbus in Storm Cluster

The only Nimbus in the Storm cluster is installed on the Hadoop NameNode.
If the Nimbus is not running, Storm UI (on port 8744) returns the following error message

java.lang.RuntimeException: Could not find leader nimbus from seed hosts ["nimbus-server1"]. Did you specify a valid list of nimbus hosts for config nimbus.seeds

Start Nimbus service from Ambari.

The Storm UI, under Nimbus Summary shows one host. It’s default port is 6627 and the status is “Leader”.
I am running one simple test Topology RandomWordsHdfsTopology and the log on the Supervisor executing the Bolt is showing me lines in the following manner:

2016-10-08 11:13:37.885 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Spark] TASK: 4 DELTA:
2016-10-08 11:13:37.986 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Hadoop]
2016-10-08 11:13:37.986 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME:  TUPLE: source: random-words-spout:5, stream: default, id: {}, [Hadoop]
2016-10-08 11:13:37.986 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Hadoop] TASK: 4 DELTA:
2016-10-08 11:13:38.087 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Kafka]
2016-10-08 11:13:38.088 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME:  TUPLE: source: random-words-spout:5, stream: default, id: {}, [Kafka]
2016-10-08 11:13:38.088 b.s.d.executor [INFO] Execute done TUPLE source: random-words-spout:5, stream: default, id: {}, [Kafka] TASK: 4 DELTA:
2016-10-08 11:13:38.188 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Storm]
2016-10-08 11:13:38.189 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME: 0 TUPLE: source: random-words-spout:5, stream: default, id: {}, [Storm]

And the random words are being written to a file in HDFS.

If the Nimbus shuts down, Zookeeper and Supervisor continue running the Topology. In this case, the log file on the Supervisor keeps logging random words and the file in HDFS continues to be appended. The Storm UI shows the error message posted above and running

storm list

from the Storm client machine returns the same error message.

Starting the Nimbus again and looking at the $STORM_LOGS/nimbus.log on nimbus-server1 teaches us how Nimbus reacts upon restart.
Some lines taken from the log file:

b.s.zookeeper [INFO] nimbus-server1 gained leadership, checking if it has all the topology code locally.
b.s.zookeeper [INFO] active-topology-ids [RandomWordsHdfsTopology-1-1475917797] local-topology-ids [RandomWordsHdfsTopology-1-1475917797] diff-topology []
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.
b.s.d.nimbus [INFO] Starting Nimbus server...
...
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.

With other words, the active Topology did not suffer from Nimbus downtime. With Nimbus down, nonew Topologies can be submitted and existing ones cannot be manipulated.

Multiple Nimbus in Storm Cluster

Adding another Nimbus for Nimbus High Availability is simple in Ambari.
The second Nimbus is added on the Client node of the cluster. After it is added and the Storm service restarted, the Storm UI, under Nimbus Summary shows two Nimbus hosts one being Leader and one having status “Not a Leader”.

The client-server2, which has “Not a Leader” Nimbus reveals the following lines in the nimbus.log file:

...
b.s.d.nimbus [INFO] not a leader, skipping cleanup-corrupt-topologies
b.s.d.nimbus [INFO] Starting Nimbus server...
b.s.d.nimbus [INFO] not a leader, skipping assignments
b.s.d.nimbus [INFO] not a leader, skipping cleanup
b.s.d.nimbus [INFO] not a leader skipping , credential renweal.
...
b.s.d.nimbus [INFO] missing topology RandomWordsHdfsTopology-1-1475917797 has state on zookeeper but doesn't have a local dir on this host.
...
b.s.d.nimbus [INFO] trying to download missing topology code from NimbusInfo{host='nimbus-server1', port=6627, isLeader=false}

The “Not a Leader” Nimbus is now updated with the Storm CLuster and its topologies. Now the leader Nimbus is stopped:

b.s.zookeeper [INFO] client-server2 gained leadership, checking if it has all the topology code locally.
b.s.zookeeper [INFO] active-topology-ids [RandomWordsHdfsTopology-1-1475917797] local-topology-ids [RandomWordsHdfsTopology-1-1475917797] diff-topology []
b.s.zookeeper [INFO] Accepting leadership, all active topology found localy.

The Nimbus on client-server2 takes over as the Leader and Nimbus on the nimbus-server1 has status “Offline”.

When multiple Nimbus services are up and running, the “Leader” status is being switched between them. Roughly, this goes on every couple of minutes.

Nimbus has a vital role in the Storm Cluster and it is naive to think as long as Topology is running, I do not need Nimbus.

Streaming with Storm – simple example with HDFS bolt

This post describes a simple Storm topology – random words are written to HDFS. The topology is uploaded on the cluster from the client node. Nimbus is on the cluster’s NameNode. I have 4 DataNodes and on each of them a Supervisor is installed. More on how I installed and configured Storm can be found here.

Services used

I am using Hortonworks 2.4, Hadoop is version 2.7.1, Storm is version 0.10.0. All services were installed through Ambari.

Preparing development environment

Create a new maven project. How to install maven is explained here.

mvn archetype:generate -DgroupId=org.package -DartifactId=storm-project -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

When the project is created, step into the directory (in this case it is storm-project) where the pom.xml file is also located.

In the org.package (./src/main/java/org.package), create folder spout. The App.java can be deleted.

There are 3 files important for this topology: pom.xml, the spout file and the topology file.

Prepare pom.xml

The pom file for this case includes Storm dependencies, with scope provided. Storm jars are not packed together with the topology! It is important to match the versions.

maven-shade-plugin

Add build node with the plugin

    <build>
        <sourceDirectory>src/</sourceDirectory>
        <resources>
            <resource>
                <directory>${basedir}</directory>
                <includes>
                    <include>*</include>
                </includes>
            </resource>
        </resources>
        <outputDirectory>classes/</outputDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

clojure

Add clojure in the dependencies node. Be sure to check for newer version

<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.8.0</version>
</dependency>

storm-core

Make sure the version matches Storm installation

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>0.10.0</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

hadoop-client

Hadoop client XML node. Make sure the version matches your Hadoop installation. org.slf4j is omitted otherwise messages about multiple version of the package are appearing

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>2.7.1</version>
	<exclusions>
		<exclusion>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
		</exclusion>
	</exclusions>
</dependency>

hadoop-hdfs

Hadoop hdfs XML node. Make sure the version matches your Hadoop installation. org.slf4j is again omitted

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>2.7.1</version>
	<exclusions>
		<exclusion>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
		</exclusion>
	</exclusions>
</dependency>

storm-hdfs

<dependency>
	<groupId>org.apache.storm</groupId>
	<artifactId>storm-hdfs</artifactId>
	<version>0.10.1</version>
</dependency>

Now that the pom.xml is in order, you can package the project to see if pom.xml is valid

mvn package

Build success should appear. If not, the pom.xml is invalid and should be taken care of.

Click on the next page for Spout.

Installing and configuring Storm in Ambari

About Storm

Storm is a free and open source distributed real-time computation system.
Storm cluster follows master-slave model and Zookeeper is used for coordination. All data is stored in ZooKeeper.
The basic unit of data processed by Storm is tuple. Tuple consists of predefined list of fields.

Storm cluster on Hadoop

The following graphic explains the architecture one ends up with after following this post. In black text, the Hadoop nodes are shown, in blue text, Storm nodes are shown.

6 nodes in the cluster. One is dedicated NameNode, one is Client and four are DataNodes.
6 nodes in the cluster. One is dedicated to Nimbus, DRPC Server and Storm UI Server, one is Storm Client and four are Supervisors.

storm-architecture

Ports

Make sure you open the following ports:
Node where Nimbus (master) is (are) installed: 2181, 6627.
Nodes where Supervisors (slaves) will be installed: 6700, 6701 (and so on, depending on the number of workers per supervisor).
Default Storm UI Server port is 8744, open the port on the node where this service is installed.

Adding Service in Ambari

Add Service Service

Select the Storm service:
01-pic-storm-service

Click Next.

Assign Masters

02-assign-masters

Nimbus is the master, responsible for distributing code across worker nodes, assigning tasks, monitoring tasks for any failures and restarting them when required. Nimbus and slaves communicate through ZooKeeper.

Click Next.

Assign Slaves and Clients

Check Supervisors on all datanodes you wish to use as supervisors.
Supervisor nodes are worker nodes.

Click Next.

Customize Services

Define ports on supervisors. One port per worker. By defining the ports one basically defines how many workers per supervisor will run.

03-supervisor-slots-port

Leave the default ports for now.

Review

If everything is ok, Click Deploy.

Install, Start, Test

When the installation is complete, click Next.

Restart Required

Restart HDFS, MapReduce2, YARN and Hive. Ambari reminds you about that. The Storm Web UI should now be available on the server where Storm UI Server is installed and on port 8874.

Adding Nimbus

Adding Nimbus is quite straightforward.

In Ambari, click on service Storm.

On the right side, there is a menu Service Actions. Click on it and select Add Nimbus.

Choose the host to add Nimbus component. In this case, I am adding a Nimbus to mz client node in the cluster.

adding-nimbus-select-node

Click OK on the confirmation box

adding-nimbus-confirmation

The Nimbus is now installed. On two instances – client and NameNode.

Restart of the Storm service is needed to make the second Nimbus part of services. The newly added Nimbus has status “Not a Leader”, while the primary Nimbus has status “Leader”.

web-ui-nimbus-summary

Storm client? Yes, with a small workaround

Since I am not implementing High Availability for Storm, there is no need for two Nimbuses. The reason I added one Nimbus to the client is to get Storm client on it.

So if I remove the Nimbus from the client node, the Storm packages remain and potential Storm users can access the Storm service from the client – just like any other services in the cluster.

I can remove the Nimbus from the client just like any other service in Ambari – I stop the service and delete it.

The storm.yaml on the Client will be used when uploading the topologies and at the moment, the property nimbus.seeds has 2 properties – client FQDN and NameNode FQDN – each for one Nimbus location. The upload will still work, but if the non-existing Nimbus server is checked first, it will return an error and look for the next Nimbus server on the list.

Overview over Storm in Ambari

The summary in Ambari reveals the following picture:

summary

One Nimbus (master), 4 Supervisors (slaves) and 8 slots (4 Supervisors x 2 ports, one for each worker on each Supervisor).

Learning about Storm

I have taken the Udacity course Real-Time Analytics with Apache Storm by Twitter. Great course! Very well explained and besides learning about Storm, I also became familiar with in-memory database Redis.

My topology

I have a test topology running which takes in tweets and “bolts” them in the following storages:

  • pushes raw JSON files directly to HDFS
  • creates tuples (user-tweet), does data cleansing and pushes them in Redis
  • pushes information about user, tweet, date to MySql

I keep upgrading and improving my Topology.

Further work

  1. Working with Trident
  2. Checking how Spark Streaming can compete with Storm
  3. Testing Apache Samza to find out why LinkedIn was not happy with Storm and decided to develop Samza

Now we can start playing with Storm! Here is an example of Storm topology that takes random words and pushes them into HDFS.