Adding service Druid to HDP 2.6 stack

Druid is a “fast column-oriented distributed data store”, according to the description in Ambari. It is a new service, added in HDP 2.6. The service is Technical Preview and the version offered is 0.9.2. Druid’s website is druid.io.

!!! Hortonworks Data Platform 2.6 is needed in order to install and use Druid !!!

Hortonworks has a very intriguing three-part series on ultra fast analytics with Hive and Druid. The first blog post can be found here.

This blog post describes how Druid is added to the HDP 2.6 stack with Ambari. The documentation I used is here. According to my experience, it does not hold water. I had to make some adjustment in order to start all Druid services.

Requirements

  • Zookeeper: Druid requires installation of Zookeeper. This service is already installed on my cluster.
  • Deep storage: deep storage layer for Druid in HDP can either be HDFS or S3. Parameter “druid.storage.type” is used to define this. Installation default is HDFS.
  • Metadata storage: for holding information about Druid segments and tasks. MySql is my metadata storage of choice.
  • Batch execution engine: resource manager is YARN, execution engine is MapReduce2. Druid hadoop index tasks use MapReduce jobs for distributed ingestion of data.

All these requirements are taken care of in Ambari, most of them with a sufficient default value.

Services within Druid

  • Broker – interface between users and Druid’s historical and realtime nodes.
  • Overlord – maintain a task queue that consists of user-submitted tasks.
  • Coordinator – serve to assign segments to historical nodes, handle data replication, and to ensure that segments are distributed evenly across the historical nodes.
  • Druid Router – serve as a mechanism to route queries to multiple broker nodes.
  • Druid Superset – if you know Superset, you know Druid Superset – data visualization tool.

Pre-work in metadata storage

As mentioned, my metadata storage is MySql. There are some objects that have to be created manually for the Druid installation to go through.

Log in to MySql as root.

Create druid database

CREATE DATABASE druid DEFAULT CHARACTER SET utf8;
CREATE USER 'druid'@'%' IDENTIFIED BY 'druid';
GRANT ALL PRIVILEGES ON druid.* TO 'druid'@'%';
FLUSH PRIVILEGES;

Create superset database

The superset objects in the database have to be created even though the documentation does not mention this. The installation will not go through unless it can connect to superset database to create tables in superset schema.

CREATE DATABASE superset DEFAULT CHARACTER SET utf8;
CREATE USER 'superset'@'%' IDENTIFIED BY 'druid';
GRANT ALL PRIVILEGES ON superset.* TO 'superset'@'%';
FLUSH PRIVILEGES;

Adding service

In Ambari, click on Add Service and check Druid service.

add service druid

In the next step, you are asked to define which Druid service is going to be installed on which node in the cluster. Remember, you can always move/add services.

assign masters to nodes

The Broker is on the Client node, since that service is the gateway to external world.

In the next step – Assigning Slaves and Clients – the following two needs to be defined where they will be installed:

  • Druid Historical: Loads data segments.
  • Druid MiddleManager: Runs Druid indexing tasks.

Generally you should select Druid Historical and Druid MiddleManager for multiple nodes. Both services are on namenode to begin with.

Next step are settings. There are some passwords and MySql server that needs to be defined. Secret key is also something one needs to define. A random string of characters would do the trick.

Be sure to create the objects in the MySql before you proceed with the installation.

installation settings

!!! Superset Database port should be 3306, just like Metadata storage port.

The advanced tab (picture above) is mostly for the superset parameters – entering name, email and password is needed to proceed with the installation. This is later on used in the visualization tool Superset.

Once you click OK, you are asked to doublecheck and change some recommended values. The following ones are related to Druid installation and should be checked to accept the recommended values.

dependency configuration.jpg

In the Review step, check if everything is as it should be and click Deploy.

After the installation completes all Druid services should be up and running. If there is the need to restart any services, do so.

Tweaking MapReduce2

There is one detail not mentioned in Hortonworks documentation when Druid is installed. There are two parameters in MapReduce2 that have to be tweaked in order for Druid to successfully load data. Explanation is at the bottom.

The parameters are:

  • mapreduce.map.java.opts
  • mapreduce.reduce.java.opts

The following should be added at the end of the existing values:

-Duser.timezone=UTC -Dfile.encoding=UTF-8

How it looks in Ambari:

map java heap size parameterreduce java heap size parameter

The service MapReduce2 should now be restarted.

Explanation

Various error messages occur in the Druid Console log files when the Druid job start to load the data. The error messages vary depending on the data, but generally, they do not provide any useful information.
From my experience, one error had a problem with the first line in a valid csv file, while in another example, the error was that no data can be indexed (code below).

Caused by: java.lang.RuntimeException: No buckets?? seems there is no data to index.
	at io.druid.indexer.IndexGeneratorJob.run(IndexGeneratorJob.java:176) ~[druid-indexing-hadoop-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]
	at io.druid.indexer.JobHelper.runJobs(JobHelper.java:349) ~[druid-indexing-hadoop-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]
	at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:94) ~[druid-indexing-hadoop-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]
	at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:261) ~[druid-indexing-service-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:201) ~[druid-indexing-service-0.9.2.2.6.0.3-8.jar:0.9.2.2.6.0.3-8]
	... 7 more

 

Upgrading HDP 2.5 to 2.6

This blog post explains how an express upgrade from HDP 2.5 to HDP 2.6 has been done.

I have a HDP 2.5 cluster on AWS, Ubuntu 14.04 is running on all instances. My metadata database of choice is MySql 5.6.

Prior to upgrading HDP, Ambari has to be upgraded to 2.5. An upgrade from Ambari 2.4 to 2.5 is described here.

Backup databases

Do a backup of all databases that are storing metadata for services installed in HDP.

Example of backing up Hive metadata:
On the server where Hive metastore database is, create a backup folder

mkdir /home/ubuntu/hive-backup

Dump the database into a file (enter password when prompted):

mysqldump -u hive -p hive > /home/ubuntu/hive-backup/hive.mysql

Backup namenode files

Create backup directory

mkdir/home/ubuntu/hdp25-backup

Backup a complete block map of the file system

sudo -u hdfs hdfs fsck / -files -blocks -locations > /home/ubuntu/hdp25-backup/dfs-old-fsck-1.log

Create a list of all the DataNodes in the cluster

sudo -u hdfs hdfs dfsadmin -report > /home/ubuntu/hdp25-backup/dfs-old-report-1.log

Capture the complete namespace of the file system

sudo -u hdfs hdfs dfs -ls -R / > /home/ubuntu/hdp25-backup/dfs-old-lsr-1.log

Go into safemode

sudo -u hdfs hdfs dfsadmin -safemode enter

Output

Safe mode is ON

Save namespace

sudo -u hdfs hdfs dfsadmin -saveNamespace

Output

Save namespace successful

Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory

sudo cp /hadoop/hdfs/namenode/current/fsimage_0000000000000485884 hdp25-backup/
sudo cp /hadoop/hdfs/namenode/current/fsimage_0000000000000485884.md5 hdp25-backup/

Store the layoutVersion for the NameNode

sudo cp /hadoop/hdfs/namenode/current/VERSION hdp25-backup/

Take the NameNode out of Safe Mode

sudo -u hdfs hdfs dfsadmin -safemode leave

Finalize any prior HDFS upgrade

sudo -u hdfs hdfs dfsadmin -finalizeUpgrade

Output

Finalize upgrade successful

Upgrading Ambari 2.4 to 2.5

This post describes how an upgrade from Ambari 2.4.1.0 to 2.5 has been done. The reason for that is to be able to further upgrade HDP to 2.6. Upgrade of HDP from 2.5 to 2.6 is described here.

Ambari Server is installed on Ubuntu 14.04. The same OS is used across the whole HDP cluster.

The following services are upgraded using this blog post:

  • Ambari Server
  • Ambari Agent
  • Ambari Infra
  • Ambari Metrics
  • Ambari Collector
  • Grafana

Backup

It is important to do a database backup of the Ambari database. Metadata for my Ambari is stored in MySql database.

Create a directory for backup

mkdir /home/ubuntu/ambari24-backup

Backup the database (enter password when prompted)

mysqldump -u ambari -p ambari_db > /home/ubuntu/ambari24-backup/ambari.mysql

Make a safe copy of the Ambari Server configuration file

sudo cp /etc/ambari-server/conf/ambari.properties ambari24-backup/

Prepare for installation of Ambari Agent and Server

Stop Ambari Metrics from the Ambari Web UI

Stop Ambari Server on Ambari Server instance

sudo ambari-server stop

Stop all Ambari Agents on all instances in the cluster where it is running

sudo ambari-agent stop

On all instances running Ambari Server or Ambari Agent do the following

sudo mv /etc/apt/sources.list.d/ambari.list db-backups/
sudo wget -nv http://public-repo-1.hortonworks.com/ambari/ubuntu14/2.x/updates/2.5.0.3/ambari.list -O /etc/apt/sources.list.d/ambari.list

Upgrade Ambari Server

sudo apt-get clean all
sudo apt-get update -y
sudo apt-cache show ambari-server | grep Version

The last command should output something like this

Version: 2.5.0.3-7
Version: 2.4.1.0-22

This means version 2.5 is available, Ambari Server can be installed

Install Ambari Server

sudo apt-get install ambari-server

Some lines from the output

The following packages will be upgraded:
  ambari-server

Unpacking ambari-server (2.5.0.3-7) over (2.4.1.0-22) ...

Setting up ambari-server (2.5.0.3-7) ...

Confirm that there is only one ambari server jar file

ll /usr/lib/ambari-server/ambari-server*jar

Output

-rw-r--r-- 1 root root 5806966 Apr  2 23:33 /usr/lib/ambari-server/ambari-server-2.5.0.3.7.jar

Install Ambari Agent

On each host running Ambari agent

sudo apt-get update
sudo apt-get install ambari-agent

Check if the Ambari agent install was a success

dpkg -l ambari-agent

Output from one node

Desired=Unknown/Install/Remove/Purge/Hold
| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
||/ Name                         Version        Architecture    Description
+++-============================-==============-===============-=================
ii  ambari-agent                 2.5.0.3-7      amd64           Ambari Agent

Upgrade Ambari DB schema

On Ambari Server instance, run the following command

sudo ambari-server upgrade

The following question shows up. The backup has been done at the beginning. Type y and press Enter.

Ambari Server configured for MySQL. Confirm you have made a backup of the Ambari Server database [y/n] (y)?

Output

INFO: Upgrading database schema
INFO: Return code from schema upgrade command, retcode = 0
INFO: Schema upgrade completed
Adjusting ambari-server permissions and ownership...
Ambari Server 'upgrade' completed successfully.

Start the services

Start Ambari Server

sudo ambari-server start

Start Ambari Agent on all instances where it is installed

sudo ambari-agent start

Post-installation tasks

Hive and Oozie (which I have installed in HDP) are using MySql, so I have to put the jar file in place

sudo ambari-server setup --jdbc-db=mysql --jdbc-driver=/usr/share/java/mysql-connector-java.jar

Manipulating files from S3 with Apache Spark

Update 22/5/2019: Here is a post about how to use Spark, Scala, S3 and sbt in Intellij IDEA to create a JAR application that reads from S3.

This example has been tested on Apache Spark 2.0.2 and 2.1.0. It describes how to prepare the properties file with AWS credentials, run spark-shell to read the properties, reads a file from S3 and writes from a DataFrame to S3.

This post assumes there is an S3 bucket with a test file available. I have an S3 bucket called markobucket and in folder folder01 I have the test file called SearchLog.tsv.

The project’s home is /home/ubuntu/s3-test. Folder jars is created in the project’s home.

Step into jars folder. Download the AWS Java SDK and Hadoop AWS jars. In this case, they are downloaded to /home/ubuntu/s3-test/jars

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar

Create a properties file in the project’s home. The name, for example, is s3.properties. Add and adjust the following text in the file.

spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /home/ubuntu/s3-test/jars/aws-java-sdk-1.7.4.jar:/home/ubuntu/s3-test/jars/hadoop-aws-2.7.3.jar
spark.hadoop.fs.s3a.access.key  [ACCESS_KEY]
spark.hadoop.fs.s3a.secret.key  [SECRET_KEY]

Once the file is saved, we can test the access by starting spark-shell.

spark-shell --properties-file /home/ubuntu/s3-test/s3.properties

Once in Spark, the configuration properties can be checked by running

spark.conf.getAll

The String in the result should, among other parameters, also show values for keys spark.hadoop.fs.s3a.secret.key, spark.hadoop.fs.s3a.access.key, spark.driver.extraClassPath and spark.hadoop.fs.s3a.impl. The value for specific key can also be checked by running spark.conf.get(KEY_NAME), as example below shows.

spark.conf.get("spark.hadoop.fs.s3a.impl")

The jars and credentials are read by Spark application now. Let us read the test file into an RDD.

val fRDD = sc.textFile("s3a://markosbucket/folder01")

Output:

fRDD: org.apache.spark.rdd.RDD[String] = s3a://markosbucket/folder01 MapPartitionsRDD[1] at textFile at :24

Remember, this is an RDD, not a DataFrame or DataSet!

Print out first three lines from the file

fRDD.take(3)

Reading a CSV file directly into a DataFrame:

val fDF = sc.read.csv("s3a://markosbucket/folder01")

Writing to S3

Writing to S3 storage is quite straightforward as well.
In the project’s home, I created a folder called data and downloaded a random file

mkdir data
wget https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

Now we can load the local file in a DataFrame

val airportDF = spark.read.csv("data/airports.dat")

Saving the DataFrame to S3 is done by first converting the Dataframe to RDD

airportDF.rdd.saveAsTextFile("s3a://markosbucket/folder01/airport-output")

Refreshing the S3 folder should show the new folder – airport-output which has 2 files _SUCCESS and part-00000.

Some Error messages with fixes

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Fix:
The jar files were not added to Spark application.

com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain

Fix:
AWS credentials were not specified in the configuration file. They can be defined at runtime as well:

sc.hadoopConfiguration.set("fs.s3a.access.key", [ACCESS_KEY])
sc.hadoopConfiguration.set("fs.s3a.secret.key",[SECRET_KEY])
java.io.IOException: Bucket markosbucket.s3.amazonaws.com does not exist

Fix:
In AWS Console, right click on file, choose properties and copy the value next to Link. Replace https with s3a and remove the domain name (“s3-…”).

Installing Ambari Infra for enabling Ranger Audit Access

About the key services mentioned in this post:
Apache Solr – an open-source enterprise search platform. Ranger is using it to store audit logs.
Ambari Infra – core shared service used by Ambari managed components. The database is Solr.

Using a database for Audit Access in Ranger is not an option anymore with HDP 2.5. What is being offered now is Solr and HDFS. It is recommended that Ranger audits are written to Solr and HDFS.
Solr takes care of the search queries from th Ranger Web interface, while HDFS is for more persistent  storing of audits.

This was done on an HDP 2.5 cluster on AWS.

Installing Ambari Infra

Even though the HDP’s documentation says Solr should be installed before Ranger, I installed Ranger service first because of my previous Ranger experience when I used MySql for audit logs.

So installing Ambari Infra is really a clicking job. The only thing to check is where the service is going to be installed. I installed it on NameNode. Remember, it is easy to move services from on node to another.

Configuring Ranger with Solr

Click on Ranger and click on Configs -> Ranger Audit. From there Turn on Audit to Solr and SolrCloud.

You should now have enabled both Solr and HDFS for collecting audit logs.

If you now log in to Ranger, you should see audit logs.

If you plan to build an application in Solr, do not use the solr that is intended for Ambari Infra but install Solr.

Very useful documentation on this topic is available here.

Adding Solr via Ambari

Solr is an open source search engine service. The service is a part of the Hortonworks Data Platform and prior to installing it via Ambari, the service should be added (Zeppelin Notebook went through the same in HDP 2.4 – it had to be added manually before installing). Here is the link to the post about how to add Solr to the list of services.

Once the Solr service is available on the Add Services list, it can be installed. It is a simple process – click next a few times, deploy and that is it. Well, not really.

The following error message will appear

error while installing.JPG

This documentation at the bottom states the following:

In the case of the Java preinstall check failing, the easiest remediation is to login to each machine that Solr will be installed on, temporarily set the JAVA_HOME environmental variable, then then use yum/zypper/apt-get to install the package. For example on CentOS:

export JAVA_HOME=/usr/jdk64/jdk1.8.0_77
yum install lucidworks-hdpsearch

The Datanodes did not find any Java so I installed Java

sudo add-apt-repository ppa:openjdk-r/ppa && sudo apt-get update && sudo apt-get install openjdk-8-jdk -y

Added JAVA_HOME to /env/environment

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

And I installed the packages on every DataNode.

cd /etc/apt/sources.list.d
sudo wget http://public-repo-1.hortonworks.com/HDP-SOLR-2.5-100/repos/ubuntu14/hdp-solr.list
sudo apt-get update -y
sudo apt-get install lucidworks-hdpsearch

Do not forget to do the same on client nodes and Namenode(s).

When the install is through and if all went well, the following output is given

Result on all nodes:

====
Package lucidworks-hdpsearch was installed
====

The NameNode challenge

The previous steps worked on all nodes except on NameNode! I figured out th

echo $JAVA_HOME

NameNode show the following

/usr/jdk64/jdk1.8.0_77

DataNodes and Client gives me this

/usr/lib/jvm/java-8-openjdk-amd64

So I installed Java on NameNode as described above (I did not remove any versions) and ran the following commands (same as above)

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
cd /etc/apt/sources.list.d
sudo wget http://public-repo-1.hortonworks.com/HDP-SOLR-2.5-100/repos/ubuntu14/hdp-solr.list
sudo apt-get update -y
sudo apt-get install lucidworks-hdpsearch

Output was

====
Package lucidworks-hdpsearch was installed
====

I went to Ambari, I deleted the Solr with Install Fail status, installed it again and it was a success!

I have no explanation why (or if) the issue was in the Java version.

Adding Solr service to list of services in Ambari

The goal in this post is to add Solr service to the Ambari Add Services list.

Operating system is Ubuntu 14.04 on AWS. I am using Ambari 2.4.1.0, Hortonworks Data Platform version is 2.5.0.0.

Ssh to Ambari server and step into /tmp directory

cd /tmp

Download the Solr package

wget http://public-repo-1.hortonworks.com/HDP-SOLR/hdp-solr-ambari-mp/solr-service-mpack-5.5.2.2.5.tar.gz

Install the package

sudo ambari-server install-mpack --mpack=/tmp/solr-service-mpack-5.5.2.2.5.tar.gz

Output

Using python  /usr/bin/python
Installing management pack
Ambari Server 'install-mpack' completed successfully.

Create definition for the HDP Search repository

sudo vi /var/lib/ambari-server/resources/stacks/HDP/2.5/repos/repoinfo.xml

Find your os and add to it repo for HDP-SOLR (below is example for Ubuntu 14)

  <os family="ubuntu14">
    <repo>
      <baseurl>http://public-repo-1.hortonworks.com/HDP/ubuntu14/2.x/updates/2.5.0.0</baseurl>
      <repoid>HDP-2.5</repoid>
      <reponame>HDP</reponame>
    </repo>
    <repo>
      <baseurl>http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/ubuntu12</baseurl>
      <repoid>HDP-UTILS-1.1.0.21</repoid>
      <reponame>HDP-UTILS</reponame>
    </repo>
    <repo>
      <baseurl>http://public-repo-1.hortonworks.com/HDP-SOLR-2.5-100/repos/ubuntu14</baseurl>
      <repoid>HDP-SOLR-2.5-100</repoid>
      <reponame>HDP-SOLR</reponame>
    </repo>
  </os>

List of other systems.

 sudo ambari-server restart 

Log in to Ambari and click on Add Services. Solr should be available now (at the bottom)
solr-add-service-list

Solr can now be installed via Ambari. This is explained in this post.

Creating a multinode Apache Spark cluster on AWS from command line

The idea is to create a Spark cluster on AWS according to the needs, maximize the use of it and terminate it after the processing is done.

All the instances have Ubuntu 14.04 operating system, the instance used in the following script the free-tier t2.micro instance with 8GB storage and 1GB RAM.

To have this working from the command line, aws package has to be installed.

One instance, master in this case, is always in State “running” and this instance has the following installed:

  • aws package
  • Apache Spark

How to install Spark 2.0 is described here. There is also the link to the blog about how to install Spark History Server.

The Spark History Server can be reached on port 18080 – MASTER_PUBLIC_IP:18080 and Spark Master is on port 8080. If the pages do not load, start the services as spark user. The scripts for starting the services can be found in $SPARK_HOME/sbin.

The script for launching instances

The following script takes in one parameter – number of instances to launch and attach them to the spark cluster. These instances would become workers in the spark world.

A new script is created

vi create-spark-cluster.sh

And the following lines are written in it. Adjust accordingly (details below).

#!/bin/sh

#removes any old associations that have "worker" in it
sudo sed -i.bak '/worker/d' /etc/hosts

#removes known hosts from the file
sudo sed -i 'd' /home/ubuntu/.ssh/known_hosts

#run the loop to create as many slaves as defined in the input parameter
for i in $(seq -f %02g 1 $1)
do
  #name of the worker
  NAME="worker$i"

  #run the aws command to create an instance and run a script when the instance is created.
  #the command returns the private IP address which is used to update the local /etc/hosts file
  PRIV_IP_ADDR=$(aws ec2 run-instances --image-id ami-0d77397e --count 1 \
            --instance-type t2.micro --key-name MYKEYPAIR \
            --user-data file:///PATH_TO_SCRIPT/user-data.sh \
            --subnet-id SUBNET_ID --security-group-ids SECURITY_GROUP \
            --associate-public-ip-address \
            --output text | grep PRIVATEIPADDRESSES | awk '{print $4 "\t" $3}')

   #add the IP and hostname association to the /etc/hosts file
   echo "$PRIV_IP_ADDR" "$NAME" | sudo tee -a /etc/hosts

done

Line 19: option user-data allows you to define a file that is run on the new instance. In this file, the steps for Spark worker installation and setup are defined.
MYKEYPAIR: name of the private key you use in AWS console when launching new instances (this is not a path to the private key on the instance you are running the script from!).
SUBNET_ID: ID of the subnet you are using. It is something one creates when starting with EC2 in AWS.
SECURITY_GROUP: name of the existing security group that this instance should use.

 

The script that runs at instance launch

The following commands in the script are run as soon as each instance is created.
At the top of the script, the operating system is updated, python is installed and JDK as well.

#!/bin/sh

sudo apt-get update -y  && \
sudo apt-get install python-minimal -y && \
sudo apt-get install default-jdk -y && \
cd /etc/ && \
sudo wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.1-bin-hadoop2.7.tgz && \
sudo tar -xvzf spark-2.0.1-bin-hadoop2.7.tgz && \
sudo rm spark-2.0.1-bin-hadoop2.7.tgz && \
sudo useradd spark && \
export SPARK_HOME=/etc/spark-2.0.1-bin-hadoop2.7 && \
sudo chown -R spark:spark $SPARK_HOME && \
sudo -u spark mkdir $SPARK_HOME/logs && \
sudo -u spark $SPARK_HOME/sbin/start-slave.sh spark://ip-10-0-0-95.eu-west-1.compute.internal:7077

Lines 6-7: Spark package is downloaded into /etc/ and unpacked.
Line 14: The slave is started and connected to the master node (Spark Master’s URL address is the script’s parameter).

Example of running the script

Thre following example creates a Spark cluster with 3 workers.

sh create-spark-cluster.sh 3

Example of the output:

10.0.0.104      ip-10-0-0-104.eu-west-1.compute.internal worker01
10.0.0.80       ip-10-0-0-80.eu-west-1.compute.internal worker02
10.0.0.47       ip-10-0-0-47.eu-west-1.compute.internal worker03

These three lines are also added to the /etc/hosts.

The Spark Master Web Interface (SPARK_MASTER_PUBLIC_IP:8080) gives more details about the Spark cluster.

spark-master

The interface displays the recently added workers with State being ALIVE. Two workers have are in state DEAD – they have been terminated from AWS, but Spark Master has not updated the Workers statistic yet.

Memory in use is 3 GB – the instances used in the cluster creation has one GB each.

Conclusion

The blog post shows a very simple way of creating a Spark cluster. The cluster creation script can be made a lot more dynamic and the script that is run on the newly created instances could be extended (installing python packaged needed for work).

Notes from Marz’ Big Data – principles and best practices of scalable real-time data systems – chapter 2

Notes from chapter 1

2.             Data model for Big Data

2.1 The properties of data

The core of the Lambda architecture is the master dataset. It is the only part of the architecture, which should be guarded from corruption.

There are two components to the master dataset:

  • data model
  • how master dataset is physically stored

Definitions of the terms:

  • information – general collection of information relevant to the system
  • data – information that cannot be derived from anything else
  • queries – questions to ask the data
  • views – information derived from the base data

One person’s data can be another’s view.

2.1.1       Data is raw

It is best to store the rawest data you can obtain. This is important because you might have some question to ask your data in the future that you cannot ask now.

Unstructured data is rawer than normalized data. When deciding what raw data to store, there is a grey area between parsing and semantic normalization. Semantic normalization is the process of transforming free-form information into structured form. The semantic normalization algorithm would try to match the input with a known value.

It is better to store data in unstructured form, because the semantic normalization algorithm might improve over time.

2.1.2       Data is immutable

Relational databases offer operation update. With Big Data systems, immutability is the key. Data is not updated or deleted, only added. Two advantages derive from it:

  • human-fault tolerance – no data is lost if a human failure is present
  • simplicity – immutable data model offers only append operation

One trade-off for immutable approach is that it uses more storage.

2.2 The fact-based model for representing data

Data is the set of information that cannot be derived from anything else.

In the fact-based model, you represent data as fundamental units – facts. Facts are atomic because they cannot be divided into further into meaningful components. Facts are also timestamped, which makes them eternally true.

Facts should also be uniquely identifiable – in case of two identical data coming in at the same time (f. ex. pageview from same IP address at the same time), nonce can be added. Nonce is a 64-bit randomly generated number.

Fact-based model:

  • stores your raw data as atomic facts.
  • facts are immutable and eternally true
  • each fact is identifiable

Benefits of the fact-based model:

  • queryable at any time in history
  • human-fault tolerant
  • handles partial information
  • has advantages of normalized (batch layer) and denormalized (serving layer) forms. These are mutually exclusive, so a choice between query efficiency and data consistency has to be made.

Having information stored in multiple locations increases the risk of it becoming inconsistent (list of values type of solution is in place here). This removes the risk of inconsistency, but a join is needed to answer queries – potentially expensive operation.

In the Lambda architecture, the master dataset is fully normalized. The batch views are like denormalized tables and are defined as functions on the master dataset.

2.3 Graph schemas

Graph schemas capture the structure of a dataset stored using the fact-based model.

2.3.1       Elements of a graph schema

Graph schema has three components:

  • nodes – entities in the system
  • edges – relationships between nodes
  • properties – information about entities

graph.JPG

2.3.2       The need for an enforceable schema

Information is now stored as facts, graph schema describes the types of facts. What is missing is in what format to store the facts.

One option is to use semistructured text format like JSON. This provides simplicity and flexibility. The challenge might appear when valid JSON but with inconsistent format or missing data appears.

In order to guarantee consistent format an enforceable schema is an alternative. It guarantees all required fields are present and ensure all values are of expected type. This can be implement using serialization framework. Serialization network provides a language-neutral way to define the nodes, edges and properties of the schema.

 

One of the beauties of the fact-based model and graph schemas is that they can evolve as different types of data become available.