Fargate in AWS ECS with Terraform

This post describes how to provision a container in AWS ECS from Terraform. The container’s image is fetched from docker hub. For the demonstration purposes, the nginx image will be used.

ECS service launch type described here is Fargate. This type gives simplicity.

The environment for provisioning with Terraform is a Docker container. More on that here. In order for this to work, AWS user credentials have to be generated as mentioned in the Administration section.

Administration

Create a user in AWS IAM and create access key for the user. Store ACCESS KEY and SECRET ACCESS KEY somewhere since they will be used in Terraform.

Add the following policies to the user:

  • AmazonVPCFullAccess
  • AmazonECS_FullAccess

You can fine-tune the policies as you wish, for the demo purpose this should be acceptable.

VPC

Preparing the VPC and security is a must, so the minimum in order to have the container running is described here.

This Terraform file creates a VPC, Internet Gateway, Route, Subnet and a Security Group which are alle needed to reach to the published container from the outside world. Fine-tuning of the VPC services is ignored for simplicity sake. Port 80 is opened to the world to be able to test the container.

ECS

Once the VPC is in place, the rest is quite simple. The ecs.tf shows how to get everything working.

Create cluster

Create the ECS cluster. This is launch type independent.

resource "aws_ecs_cluster" "ping" {
  name = "ping"

  setting {
    name  = "containerInsights"
    value = "enabled"
  }
}

Define task

Define how the container should look like: the resources needed, container image, ports,…

resource "aws_ecs_task_definition" "task" {
  family                        = "service"
  network_mode                  = "awsvpc"
  requires_compatibilities      = ["FARGATE", "EC2"]
  cpu                           = 512
  memory                        = 2048
  container_definitions         = jsonencode([
    {
      name      = "nginx-app"
      image     = "nginx:latest"
      cpu       = 512
      memory    = 2048
      essential = true  # if true and if fails, all other containers fail. Must have at least one essential
      portMappings = [
        {
          containerPort = 80
          hostPort      = 80
        }
      ]
    }
  ])
}

Argument container_definitions can also use Terraform function file. This makes the code easier to read. Here is an example of a Terraform file using the function, and here is the JSON file the function uses as the argument.

Service

Now we can finally deploy the service – create the container and use it

resource "aws_ecs_service" "service" {
  name              = "service"
  cluster           = aws_ecs_cluster.ping.id
  task_definition   = aws_ecs_task_definition.task.id
  desired_count     = 1
  launch_type       = "FARGATE"
  platform_version  = "LATEST"

  network_configuration {
    assign_public_ip  = true
    security_groups   = [aws_security_group.sg.id]
    subnets           = [aws_subnet.subnet.id]
  }
  lifecycle {
    ignore_changes = [task_definition]
  }
}

The service is attached to a specific cluster and specific task definition. The launch type is FARGATE. Public IP will be assigned and the service will be in a specific subnet and secured by a specific security group.

Once all is provisioned we can check the result:

Go into AWS Console and find service ECS. Make sure you are in the right region. Click on clusters and find the cluster and click on it. Under tasks you should se the provisioned container, something similar to this:

Clicking on the task ID should give you task details. Under Network is the public IP. Copy it and visit it. Nginx should welcome you.

Docker, AWS, Python3 and boto3

Docker, AWS, Python3 and boto3

The idea behind is to have an independent environment to integrate Amazon Web Services’ objects and services with Python applications.

The GitHub repository with example can be found here. The README.md will probably serve you better than this blog post if you just want to get started.

The environment is offered in a form of a Docker container, which I am running on Windows 10. The above repository has a DockerFile available so the container can be build wherever.

Python 3 is the language of choice to work against the AWS and for that a library boto3 is needed. This is an AWS SDK for Python and it is used to integrate Python applications with AWS services.

Bare minimum

To get started, all is needed is access key and secret key (which requires an IAM user with assigned policies), Python and installed boto3.

The policies the user gets assigned are going to reflect in the Python code. It can be frustrating at the beginning to assign the right policies so maybe for the purpose of testing, give the user all rights to a service and narrow it down later.

Where to begin

The best service to begin with is object data storage AWS S3 where you can manipulate with buckets (folders) and objects (files). And you also see immediate results in AWS console. Costs are also minimal and there are no services running “under” S3 that need attention first. My repository has a simple Python package which lists all available buckets.

Credentials and sessions

To integrate Python application and AWS services, an IAM user is needed and users access key and service key. They can be provided in different ways, in this case, I have used sessions – which allow users (dev, test, prod…) to change at runtime. This example of credentials file with sessions gives the general idea about how to create multiple sessions.

The Python test file shows how to initialize a session.

Exception handling

Handling exceptions in Python3 and with boto3 is demonstrated in the test package. Note that the excpetion being caught is a boto3 exception.

Further work

The environment is set up, PyCharm can be used for software development while Docker can execute the tests.

There is nothing stopping you from developing a Python application.

After gaining some confidence, it would be smart to check the policies and create policies that allow a user or group excatly what they need to be allowed.

Dilemma

How far will boto3 take one organization? Is it smart to consider using, for example, Terraform when building VPC and launching EC2 instances?

It is worth making that decision and use an Infrastructure-as-Code tool on a higher level to automate faster. And prehaps use boto3 to do more granular work like manipulating objects in S3 or dealinh with users and policies.

Using Python 3 with Apache Spark on CentOS 7 with help of virtualenv

Python 2.7.5 ships with Centos 7 and more and more software on CentOS 7 either does not support Python 2 anymore, or recommends you to use Python 3.

Alternative solutions

I have come across the same challenge and earlier my two approaches to solve it were:

The first option has proven to be confusing because one has to enable the Python 3 version by executing

scl enable rh-python36 bash

which can easily lead to confusion whether Python 2 or Python 3 is used at a certain moment.

The second option is even worse – it gives you a false feeling how easy it is to just change a symbolic link, but problems start occurring as soon as one goes into (not even) more serious with Python – for example: pip will crash and fixing that is world of pain. Don’t go there.

pip or pip3

Python 2 uses pip, Python 3 use pip3, or is it? Simple rule of thumb – pip == pip3. I was in a dilemma regarding the pips and in some blog, I read about the “equation” which I am also following below – in the solution. Ansible has solved this issue in its way as you will soon see.

Preparing virtualenv

The whole idea is very simple: create a virtual environment that uses Python 3, use the virtual environment OR the installed files (Python 3 or pip are probably most useful). Below is an Ansible code snippet showing how to install virtualenv, create a virtual environment and install some pip packages in the environment.

- name: Pip install virtualenv
  pip:
    name: virtualenv
    executable: pip3

- name: Create virtualenv
  shell: /usr/local/bin/virtualenv /home/spark/sparkenv
  become: true
  become_user: spark

- name: Pip install in virtualenv
  shell: /home/spark/sparkenv/bin/pip install numpy pandas
  become: yes
  become_user: spark

Whole task can be found here.

The virtual environments is now created and path to Python 3 can be used for Spark. Other Python modules can be added in the last task just like numpy and pandas are added.

Spark and Python

Its is recommended to tell Spark where the executable Python file is. This is done in the spark-env.sh which resides in $SPARK_HOME/conf. Below is an example telling Spark to use python executable file from the folder where virtual environment sparkenv is installed.

PYSPARK_PYTHON=/home/spark/sparkenv/bin/python

Whole spark-env.sh template file (Jinja2) can be found here.

With that in place, running pyspark is going to use Python 3.

Here is the link to my repository with code that automates provision of Spark cluster in AWS using Terraform and Consul. The repository uses the above described solution.

Nginx, Gunicorn and Dash on CentOS

Nginx, Gunicorn and Dash on CentOS

Challenge to solve

I am building a website for analyses of basketball games based on the play-by-play data publicly available after endgame. My logic (parsing, fetching from the internet, algorithms, etc) is written in Python and I wanted to continue using Python all the way, also when building front end. To do that, I have chosen Dash which builds on top of Flask.

My plan was to publish the web-based analytic app called Hubie behind a domain with port 80 and run it on a Linux server in the cloud. Gunicorn is the server of choice for the web application and Nginx is a web server which in this case serves as a reverse proxy.

The web application deployed is using a DNS name created in Azure.

Virtual environment is used to test the web application from port 8080, and to execute python3 and gunicorn commands suitable for Python3.

One of the challenges CentOS 7 has is that it still used Python 2 as its default Python. Installing Python3 and changing paths in /usr/bin might seem a good solution, but it will come back and haunt you. That is why it is best to create a virtual environment with the desired Python version.

Environment

Service used to host the server is Virtual Machine on Azure. The Linux server is using image CentOS-based 7.7 and the instance size is Standard D2s v3.

Install packages

Preparing CentOS environment by installing the necessary packages:

sudo yum update -y
sudo yum install -y epel-release
sudo yum -y install python3-pip nginx git 
sudo  yum install --enablerepo="epel" ufw -y
sudo yum install -y policycoreutils-{python,devel}
sudo pip3 install virtualenv

Create www-data group

sudo groupadd www-data
sudo usermod -a -G www-data centos

Create virtual environment

This command creates a new folder inside /home/$USER/ with the same name as the virtual environment. In this case, the path to the virtual environment home is /home/centos/hubievenv.

virtualenv hubievenv

Activating the virtual environment will enforce Python installed in the virtual environment.

source hubievenv/bin/activate

Executing the above command makes a change to the command line:
(hubievenv) [centos@hubie4 ~]$

The virtual environment can be exited by typing deactivate command. Before that, the virtual environment needs to be prepared.

Install packages with pip in virtual environment

pip install gunicorn flask dash plotly pandas boto3

If not using dash, only flask, remove the dash package from the install list. No need to install flask if you are only using dash. Package boto3 is installed because my data source is AWS S3.

If you get the following error:

ERROR: botocore 1.13.33 has requirement python-dateutil<2.8.1,>=2.1; python_version >= "2.7", but you'll have python-dateutil 2.8.1 which is incompatible.

Downgrade python-dateutil:

pip install python-dateutil==2.8.0

Any other Python package needed should be installed in the virtual environment. Deactivate the virtual environment when done installing.

Create home directory for git repository

This step is not needed to make nginx and gunicorn work.

My source code for the web app is in a GitHub repository.

mkdir git
git clone https://github.com/markokole/hubie.git

Executing above commands means home to my web app project is /home/centos/git/hubie. This will come in handy later on.

Test web application

I am still in the virtual environment for the testing purpose.

Since the application I am using as an example connects to AWS S3, credentials are needed.

export AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>
 export AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>

Stepping into the git repository and executing the following command:

gunicorn --chdir logic -b 0.0.0.0:8080 hubie:server

Should load the website in a browser once you enter IP_ADDRESS:8080 or DNS_NAME:8080

Make sure you open the port 8080!

The page loads successfully, and now we work towards loading the page with port 80.

Exit virtual environment:

deactivate

Create gunicorn service

To create a Gunicorn service, two services will be created, one depending on the other.

Create gunicorn.socket file

First service creates a socket file which listens for connections.

sudo vi /etc/systemd/system/gunicorn.socket
[Unit]
Description=gunicorn socket
[Socket]
ListenStream=/run/gunicorn.sock
[Install]
WantedBy=sockets.target

No need to start this service since it is a dependence of the service described below.

Create gunicorn.service file

This file creates the Gunicorn service and prior to that starts the above mentioned socket service. Make sure both files have the same name.

sudo vi /etc/systemd/system/gunicorn.service
[Unit]
Description=gunicorn daemon
Requires=gunicorn.socket
After=network.target

[Service]
User=centos
Group=www-data
WorkingDirectory=/home/centos/git/hubie
Environment="AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>"
Environment="AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>"
Environment="PATH=/home/centos/hubievenv/bin/gunicorn"

ExecStart=/home/centos/hubievenv/bin/gunicorn --workers 3 --chdir /home/centos/git/hubie/logic --bind unix:/run/gunicorn.sock hubie:server

[Install]
WantedBy=multi-user.target

The group www-data has to exist before this service is started. Alter the parameters accordingly.

Start the gunicorn service

When the service file is created, start the service.

sudo systemctl start gunicorn

Enable the service so that it starts automatically after server restart.

sudo systemctl enable gunicorn

Check for status of the service with below command.

sudo systemctl status gunicorn

If the gunicorn service does not start add execute right to the world to the gunicorn.sock file.

sudo chmod 667 /run/gunicorn.sock

Configure Nginx and Gunicorn

Gunicorn configuration file

First, create two folders in the nginx home (/etc/nginx), folder sites-available will store the gunicorn configuration file, sites-enabled will store the symbolic link of the file.

sudo mkdir /etc/nginx/{sites-available,sites-enabled}

Create the configuration file. Keep in mind the file has to be of type *.conf.

sudo vi /etc/nginx/sites-available/gunicorn.conf
server {
    listen 80;
    server_name mydomain.com www.mydomain.com;

    location = /favicon.ico { access_log off; log_not_found off; }
    location /hubie/ {
        root /home/centos/git;
    }
    location / {
        proxy_pass http://unix:/run/gunicorn.sock;
    }
}

Server name is the DNS name or IP address of the server.

First location ignores the error of missing favicon. ico file.
Second location defines the project name with root as the home directory of the repository.

Create symbolic link

Create a symbolic link of the file in the sites-available folder.

sudo ln -s /etc/nginx/sites-available/gunicorn.conf /etc/nginx/sites-enabled

Nginx configuration file

The nginx configuration file should be changed as well.

sudo mv /etc/nginx/nginx.conf /etc/nginx/nginx.conf.default
sudo vi /etc/nginx/nginx.conf
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;

include /usr/share/nginx/modules/*.conf;

events {
    worker_connections 1024;
}

http {
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    sendfile            on;
    tcp_nopush          on;
    tcp_nodelay         on;
    keepalive_timeout   65;
    types_hash_max_size 2048;

    include             /etc/nginx/mime.types;
    default_type        application/octet-stream;


    include /etc/nginx/sites-enabled/*.conf;
    server_names_hash_bucket_size 64;
}

The file is pretty much similar to the default file, except the last two lines.

Check validity of nginx.conf

sudo nginx -t

Restart the nginx service.

sudo systemctl restart nginx

Create nginx.ini for ufw

Last file we create is a nginx.ini file to fix the firewall issues. Linux package ufw is required for the job.

sudo vi /etc/ufw/applications.d/nginx.ini
[Nginx HTTP]
title=Web Server
description=Enable NGINX HTTP traffic
ports=80/tcp

[Nginx HTTPS] \
title=Web Server (HTTPS) \
description=Enable NGINX HTTPS traffic
ports=443/tcp

[Nginx Full]
title=Web Server (HTTP,HTTPS)
description=Enable NGINX HTTP and HTTPS traffic
ports=80,443/tcp
sudo ufw enable

Answer “y” to the question.

sudo ufw allow 'Nginx Full'

Execute the following two commands and the Dash web app will be ready to use.

sudo grep nginx /var/log/audit/audit.log | audit2allow -M nginx
sudo semodule -i nginx.pp

If you check the browser, the page with the server’s DNS or IP loads on port 80.

Some error messages

502 bad gateway

connect() to unix:/run/gunicorn.sock failed (13: Permission denied) while connecting to upstream

When you run into this error, and believe me you will, make sure user nginx has access to the *.sock file in the above mentioned error message. Even though service nginx is not owned by nginx, nginx is still accessing the socket file.
With below command, it is possible to monitor the nginx error messages:

sudo tail -f var/log/nginx/error.log

504 Gateway Time-out

upstream timed out (110: Connection timed out) while reading response header from upstream

In the file that defines the service – in this example gunicorn.service – add the following option:

--timeout 120

remember to restart the service. And for more details regarding this solution, check out this stackoverflow post.

Links

Links used to put together a working example and this blog post:

Automating access from Apache Spark to S3 with Ansible

According to the Apache Spark documentation, Spark jobs must authenticate with S3 to be able to read or write data in the object storage. There are different ways of achieving that:

  • When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
  • spark-submit reads the AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_SESSION_TOKEN environment variables and sets the associated authentication options for the s3n and s3a connectors to Amazon S3.
  • In a Hadoop cluster, settings may be set in the core-site.xml file.
  • Authentication details may be manually added to the Spark configuration in spark-defaults.conf.
  • Alternatively, they can be programmatically set in the SparkConf instance used to configure the application’s SparkContext.

Honestly, I wouldn’t know much about the first option. It might have something to do with running Databricks on AWS.

The second option requires to set environment variables on all servers of the Spark cluster. If using Ansible, this can be done but only on a level of a task or role. This means that if you run a long-live Spark cluster, the variables will not be available once you start using the cluster.

The fourth option is the one that will receive the attention in this post. The spark-defaults.conf is the default configuration file and proper configuration in the file tunes your Spark cluster.

There are five configuration tuples needed to manipulate S3 data with Apache Spark. They are explained below.

Getting environmental variables into Docker

The following approach is suitable for a proof of concept or a testing. An enterprise solution should use service like Hashicorp Vault, Ansible Vault, AWS IAM or similar.

I am using Docker on Windows 10. The folder where DockerFile resides also has a file called aws_cred.env. Make sure this file is added to the .gitignore file so that it is not checked into source code repository! The env file holds the AWS key and secret key needed to authenticate with S3. The file structure is like this:

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

When running the docker container with option –env-file the environmental variables in the file get exported to the Docker container.

In the Ansible code, they can both be looked-up in the following way:

{{ lookup('env', 'AWS_ACCESS_KEY_ID') }}
{{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}

These can be used in the Jinja2 template file spark-defaults.conf.j2 to generate a Spark configuration file. The configuration tuples relevant in this case are these two:

spark.hadoop.fs.s3a.access.key {{ lookup('env', 'AWS_ACCESS_KEY_ID') }}
spark.hadoop.fs.s3a.secret.key {{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}

This now gives you the access to the S3 buckets, never mind if they are public or private.

The JAR files

First, the following tuple is mandatory for the Spark configuration:

spark.hadoop.fs.s3a.impl      org.apache.hadoop.fs.s3a.S3AFileSystem

This tells Spark what kind of file system it is dealing with. The JAR files are the library sources for this configuration.

Two libraries must be added to the instances of the Spark cluster:

  • aws-java-sdk-1.7.4
  • hadoop-aws-2.7.3

The above mentioned Jinja2 file also holds two configuration tuples relevant for these JAR files:

spark.driver.extraClassPath   /usr/spark-s3-jars/aws-java-sdk-1.7.4.jar:/usr/spark-s3-jars/hadoop-aws-2.7.3.jar
spark.executor.extraClassPath /usr/spark-s3-jars/aws-java-sdk-1.7.4.jar:/usr/spark-s3-jars/hadoop-aws-2.7.3.jar

Be careful with the versions because they must match the Spark version. The above combination has proven to work on Spark installation packages that support Hadoop 2.7. Last two tasks in this main.yml do the job for the Spark cluster.

Once the files are downloaded (for example, I download them to /usr/spark-s3-jars) Apache Spark can start reading and writing to the S3 object storage.

Zealpath and Trivago: case for AWS Cloud Engineer position

Tl;dr: https://github.com/markokole/trivago-cicd-pipeline-aws

Trivago uses Zealpath to find potential engineers to join their team. Zealpath is a website which hosts challenges that everyone can solve and submit, and with that apply for a job.

This is my first time using Zealpath and approach seems very practical. In worst case you learn about company’s technology stack (or some of it) and the way they think and solve problems. I have “applied” for the position AWS Cloud Engineer and 72 hours were given to submit the solution. My intention, honestly, was not to apply for Trivago job but to learn something new about automation and pipelines in AWS.

The case is described here. I am aware that they might remove the link at some point so I copied the text to the GitHub repository where the solution is.

Once you apply, the clock start ticking. You download a data.zip file and follow the instructions.

The confusion

The zip file itself is a bit confusing since all the files in the top directory appear to be in the two folders as well. I have removed all the duplicates from the home directory which left me with only README file.

The technology stack

The AWS services making up the pipeline are:

  • Athena
  • Cloudformation
  • Glue
  • S3

A DockerFile has been created to automate the provision of the pipeline.

The solution

My solution is in a GitHub repository. Hopefully it is well enough documented for anyone to understand it. It should be quite simple once you have an AWS account and Docker on Windows 10 installed. I have not tested it on Linux system.

All one needs to do is copy the DockerFile to a folder on a local machine, add a file called aws_cred.env and build the container.

But! Before all that is done, the variable s3_bucket in the Jupyter Notebook needs to be updated to the bucket name you plan to use. I really didn’t understand why the duplicates in the zip file. That is also the reason why I created the tar.gz file with the code from Zealpath’s zip file. I have also taken out the files I assume are duplicates.

Capturing messages in Event Hubs to Blob Storage

This post builds on the post Streaming messages from Kafka to EventHub with MirrorMaker where the messages are streamed into Event Hubs which is the datasource for this post.

Messages in Event Hubs

Messages in Event Hubs are stored in Avro format. If we wish to capture them in Blob Storage, they will be stored in the same format – Avro. Apache Avro is a binary data serialization system.

The whole infrastructure is seen here, from the Scala script that produces messages to the Blob Storage where they are stored.
Important: Avro is not a tool for capturing messages, but a file format!

Installing avro-tools

The simplest way to manipulate an Avro file is by using avro-tools. Download the avro-tools jar file.

wget https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.9.0/avro-tools-1.9.0.jar -P /opt

Reading Avro file from Blob Storage

For the purpose of testing if messages produced in Kafka landed in the Blob Storage, one file is manually downloaded and checked. It is best to copy the URL from Azure Storage accounts -> Storage Explorer -> BLOB CONTAINERS -> EVENTHUB_NAMESPACE. On the right side, drill down until the avro file is visible.

Choose the file and click on the Copy URL button. Download the file to the local computer by executing the below command (alter the URL link accordingly):

wget https://STORAGE_ACC_NAME.blob.core.windows.net/STORAGE_CONTAINER_NAME/EVENTHUB_NAMESPACE/prod.test1/0/2019/08/07/15/17/19.avro -P /tmp

The file is saved to folder /tmp and can now be read using avro-tools.

Comparing messages in Kafka with messages in Blob Storage

100 messages have been produced and saved to 5 different topics in Kafka. If topic prod.test1 is taken as an example, from CLI the messages can be listed executing the following command:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic prod.test1 --from-beginning

The output (first three and last three messages):

This is message number 2
This is message number 32
This is message number 52
.
.
.
This is message number 95
This is message number 98
This is message number 100

Checking Avro file that corresponds to the topic prod.test1 by running the following command:

java -jar /opt/avro-tools-1.9.0.jar tojson /tmp/19.avro

Returns the following rows:

{“SequenceNumber”:0,”Offset”:”0″,”EnqueuedTimeUtc”:”8/8/2019 9:02:31 AM”,”SystemProperties”:{“x-opt-kafka-key”:{“bytes”:”\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001″}},”Properties”:{},”Body”:{“bytes”:”This is message number 2“}}
{“SequenceNumber”:1,”Offset”:”560″,”EnqueuedTimeUtc”:”8/8/2019 9:02:31 AM”,”SystemProperties”:{“x-opt-kafka-key”:{“bytes”:”\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001″}},”Properties”:{},”Body”:{“bytes”:”This is message number 32“}}
{“SequenceNumber”:2,”Offset”:”608″,”EnqueuedTimeUtc”:”8/8/2019 9:02:31 AM”,”SystemProperties”:{“x-opt-kafka-key”:{“bytes”:”\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001″}},”Properties”:{},”Body”:{“bytes”:”This is message number 52“}}
.
.
.
{“SequenceNumber”:8,”Offset”:”896″,”EnqueuedTimeUtc”:”8/8/2019 9:02:31 AM”,”SystemProperties”:{“x-opt-kafka-key”:{“bytes”:”\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001″}},”Properties”:{},”Body”:{“bytes”:”This is message number 95“}}
{“SequenceNumber”:9,”Offset”:”944″,”EnqueuedTimeUtc”:”8/8/2019 9:02:31 AM”,”SystemProperties”:{“x-opt-kafka-key”:{“bytes”:”\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001″}},”Properties”:{},”Body”:{“bytes”:”This is message number 98“}}
{“SequenceNumber”:10,”Offset”:”992″,”EnqueuedTimeUtc”:”8/8/2019 9:02:31 AM”,”SystemProperties”:{“x-opt-kafka-key”:{“bytes”:”\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001″}},”Properties”:{},”Body”:{“bytes”:”This is message number 100“}}

The message numbers match. The messages the initially were produced in Kafka are now stored in Avro format in Azure Blob Storage.

There are services in Azure that work on top of Event Hubs or Blob Storage. That topic is covered in the next blog post.

Streaming messages from Kafka to EventHub with MirrorMaker

Idea

The idea is to replicate messages from Apache Kafka to Azure Event Hubs using Kafka’s MirrorMaker.

Introduction

Apache Kafka has become the most popular streaming and messaging open- source tool. Many organizations have implemented it on premise or in a public cloud. And many are content with Kafka’s performance and are hesitant to migrate to a Kafka-like service in the cloud. For example, one such service in Azure is Event Hubs. A “simple, secure and scalable real-time data ingestion” service in Azure.

LinkedIn has developed Kafka and donated it to the community in 2011. Microsoft cannot deny the popularity Kafka has gained and is therefore offering the possibility to use Kafka API to work against Event Hubs. This allows Kafka developers to continue using Kafka APIs without any disturbance. Only the configuration changes.

Architecture

MirrorMaker is ran on the consumer side – Kafka – in this case. However, it is advised to run MirrorMaker on the producer side, this would be on a server in Azure on this occasion.

For the sake of simplicity, 5 Kafka topics are defined: prod.test[1-5]. Kafka and MirrorMaker configurations are standard.

Consumer side: Kafka

First, a working Kafka is needed.

In the GitHub repository cloud and local alternatives are available:

  • provisioned to AWS using Terraform. This way you can create a Kafka cluster. The README.md files should give more details about provisioning Kafka in AWS.
  • created using Docker container. This gives you a Kafka service suitable for development and testing on your local computer. Keep in mind that Kafka needs to be started manually inside the container by executing script /opt/startall.sh.

Configuring server.properties

If Kafka in Docker container is used, the server.properties file is very standard:

broker.id=0
log.dirs=/var/logs/kafka-logs
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
offsets.topic.replication.factor=1
advertised.host.name=127.0.0.1

Using the above configuration, value for parameter bootstrap-server is localhost:9092. Alternatively, 127.0.0.1 can be used since parameter advertised.host.name is defined in the file.

Once either of Kafka alternatives is up and running, test messages can be produced.

Message producer: Scala

Kafka should be up and running and the DNS of Kafka server(s) or localhost is the input parameter when initializing an instance of the class.

Scala script is executed from the client and produces numerous messages that are randomly assigned to topics mentioned above. Client can be any server with Scala installation since DNS names are used to communicate with Kafka and all you need is to be able to reach the Kafka’s DNS names.

The script is going to produce messages to one of the topics randomly. Ten is the default number of messages produced, but this parameter can be adjusted since it is method’s input parameter.

I am generating messages using Scala CLI. Stepping into the folder where RandomMessage.scala is located and starting Scala is a good start before executing the following commands:

:load RandomMessage.scala
val rm = new RandomMessage("localhost", "test") //second parameter is name of topic with prefix prod. and suffix [1-5]
rm.CreateMessages(10000) //10 is default parameter

Output should be something like the following:

Topic: prod.test1. Message: This is message number 9701
Topic: prod.test1. Message: This is message number 9702
Topic: prod.test2. Message: This is message number 9703
Topic: prod.test3. Message: This is message number 9704
Topic: prod.test1. Message: This is message number 9705
Topic: prod.test4. Message: This is message number 9706
Topic: prod.test4. Message: This is message number 9707
Topic: prod.test5. Message: This is message number 9708
Topic: prod.test1. Message: This is message number 9709
Topic: prod.test4. Message: This is message number 9710

This should create 5 topics in Kafka and add some messages to the topics.

As soon as the script is executed it is possible to check in Kafka if topics are created:

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 –list

should return a list of topics available:

__consumer_offsets
prod.test0
prod.test1
prod.test2
prod.test3
prod.test4
prod.test5

Reading from one topic:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic prod.test3 --from-beginning

should return lines like these:

This is message number 6207
This is message number 6215
This is message number 6217
This is message number 6219
This is message number 6221

The topics and the messages are now “sitting” in Kafka. The next step is to get them into Azure Event Hubs.

Producer side: Event Hubs

Kafka’s MirrorMaker replicates the messages from Kafka to Event Hubs. This post is not going into details about configuration of MirrorMaker, it will just prepare the configuration files to produce a working example.

In this repository, Terraform is used to provision Event Hubs. If you already have an existing Event Hubs, that works too, just make sure you don’t have topics that match the names of topics used in this Proof-of-Concept.

Configuration files

There are two files needed for MirrorMaker to work: one for consumer and one for producer side (for better illustration, check the graphic on top).

Below is an example of consumer.config file for Kafka running locally:

bootstrap.servers=localhost:9092
group.id=example-mirrormaker-group
exclude.internal.topics=true
client.id=mirror_maker_consumer
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
auto.offset.reset=earliest

The last parameter will fetch all messages from Kafka. This is good for testing purposes because you avoid having a live producer to Kafka to see it works. It does however follow the offset rules like any other Kafka’s producer.

To configure producer.config what is needed from Event Hubs is Connection string–primary key, which can be found in the settings of the Event Hubs service, under Shared access policies. Clicking on the policy opens connection strings on the right side. Copy the string. Below is an example of the file.

Replace NAMESPACE with the unique namespace of your choice. Replace CONNECTION_STRING_PRIMARY_KEY with the string from Event Hubs.

bootstrap.servers=NAMESPACE.servicebus.windows.net:9093
client.id=mirror_maker_producer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="CONNECTION_STRING_PRIMARY_KEY";

Now that the configuration files are in place, the MirrorMaker can be started:

$KAFKA_HOME/bin/kafka-mirror-maker.sh --consumer.config $KAFKA_HOME/config/consumer.config --num.streams 1 --producer.config $KAFKA_HOME/config/producer.config --whitelist="prod.*"

The last parameter defines which topics should be replicated to Event Hubs. Above Scala example generated 10.000 messages to Kafka so it is expected to have 10.000 messages also in Event Hubs.

In Azure, it is obvious that all messages have been consumed by the Event Hubs – the blue line. The chart can be interpreted as all messages were consumed within the same minute.

Note: the green colour is used for presentation of messages that were captured – saved to some storage available by Azure. This is out of scope for this post.

The messages are now in Event Hubs. Its is up to the retention parameter how long they will be available there. In the repository above, the value is set to one – one day.

Next step

The messages are in the Event Hubs now. It would make sense to save them to a permanent storage so that they can be used for analysis. This is covered in this blog post.

Provision Apache Spark in AWS with Hashistack and Ansible

Provision Apache Spark in AWS with Hashistack and Ansible

Automation is the key word when it comes to using cloud services. Pay-as-you-go is the philosophy behind it.

In this post, I explain how I provision Apache Spark cluster on Amazon. The configuration of the cluster is done prior to the provisioning using the Jinja2 file templates. The cluster, once provisioning is completed, is therefor ready to use immediately.

One of the points with automation is to make data scientists more independant of data engineers: data engineer builds the solution and data scientist uses it without having the need for engineering experience.
In this case, the data scientist hast to configure the cluster using the YAML file and prepare a GitHub repository.

There are two ways of using this solution:

  • A long-live Spark cluster
    Spark cluster serves as a solution for running various jobs. The cluster is always available.
  • One-time job execution
    Spark cluster is provisioned for a specific job which is executed and then the cluster is destroyed. Data Scientist is responsible for data input and data storage in the code (example).

Technologies and services

  • AWS EC2 (Centos7)
  • Terraform for provisioning the infrastructure in AWS
  • Consul for cluster’s configuration settings
  • Ansible for software installation on the cluster
  • GitHub for version control
  • Docker as test and development environment
  • Powershell for running Docker and provision
  • Visual Studio Code for software development, running Powershell

In order to use a service like EC2 in AWS, the Virtual Private Cloud must be established. This is something I have automized using Terraform and Consul and described here. This provision is a “long-live” provision since VPC has practically no cost.

Prerequisites

I will not go into details of how to install all the technologies and services from the list. However, this GitHub repository does build a Docker container with Consul and latest Terraform. Consul in the Docker is an agent which connects to a global Consul server in Amazon. Documenting the global Consul is on my TO-DO list.

I suggest investing some time and creating a Consul with connection to your own GitHub repository that stores the configuration.

Repository on GitHub

The repository can be found at this address.

Repository Structure

There are two modules used in this project: instance and provision-spark. The module instance is pure Terraform code and does the provisioning of the instances (Spark’s master and workers) in the AWS. The output (DNS and IP addresses) of this module is the input for the module provision-spark which is more complex. It is written in Terraform, Ansible and Jinja2.

Ansible Roles

Below is the structure of the Ansible part of the module.

Roles prereq and spark are applied to all instances. The prereq role takes care of the prerequisites (java, anaconda) and the spark role downloads and installs Spark, and creates Linux objects needed for Spark to work. The start_spark_master applies only to the master instance and start_spark_workers to the worker instances. The role execute_on_spark automatically executes a job on Spark cluster (more on that later).

The path to the YAML file that executes the roles is available here.

Cluster Configuration

Cluster is configured in YAML format and the configuration is sent to the global Consul server. One configuration block servers one cluster. Example for cluster lr_iris can be found here.

Running the code

Provisioning starts in module provision-spark where the line

terraform apply -auto-approve

starts provisioning the cluster. Configuration is taken from Consul to populate the variables in Terraform. Ansible (inventory) file is created by Terraform after the EC2 instances are launched and started. After the inventory file is created, Terraform executes the spark.yml file and the rest is in the hands of Ansible. If everything goes well, the output is similar to the following:

This is a Terraform output as defined in the output.tf file.

The Spark cluster is now ready.

View in AWS Console

The instances in the Spark cluster look like this in AWS console:

Spark as a Service

Spark services running on master and workers are handled as services using systemctl. The services are created and started using Ansible: Spark workers start a service called spark-worker whose Ansible code can be seen here.
Spark Master has two services: spark-master and sparkhs (Spark History Server). Ansible code for both services is here.

Spark Master

Checking if Spark Master is available by using the public IP address and port 8080 should return an interface similar to this one:

Five workers were set up in the configuration file. This means we have a cluster with six instances: one is the Spark Master, the other five are the workers.

Spark History Server

Spark History Server, just like Spark Master become significant if long-live cluster is used. It helps monitoring and debugging the jobs (applications in Spark language).
Spark History Server can be reached at port 18080 on Spark Master.

Above is an example of an application that was executed on the Spark cluster. Note Event log directory – it is pointing to a local directory which will be removed once the cluster is destroyed. This is not an issue if we are running a long-live cluster, but if we want to keep logs for one-time clusters it is advised to store the logs externally. In this case, since Amazon is used, storing to S3 would be the best option.

Automatic Code Execution

The Spark cluster is now ready to use. Full automation process is achieved when the Spark code is automatically executed from the Terraform code once the cluster is available. In the repository, one of the Ansible roles is execute_on_spark which executes either a Python or a Scala code on the provisioned Spark cluster.

Which Spark code will be executed depends on the configuration in the YAML file. A path to a GitHub repository is part of the configuration and that repository is cloned to the Spark Master and executed.

An example mentioned above can be found here. The example is one of Hello Worlds in data science – Logistic Regression on Iris dataset. In this case, the Data Scientist is responsible for the input data and storing the results outside of the Spark cluster.

    input_file = "s3a://hdp-hive-s3/test/iris.csv"
    output_dir = "s3a://hdp-hive-s3/test/git_iris_out"

When the cluster is ready, the repository is cloned and the code is executed. Inside the code, the Data Scientist defines input and output. In this case, object storage S3 is used to do a one-time job, save the results and the Spark cluster is of no use anymore.

Spark, Scala, sbt and S3

The idea behind this blog post is to write a Spark application in Scala, build the project with sbt and run the application which reads from a simple text file in S3.

If you are interested in how to access S3 files from Apache Spark with Ansible, check out this post.

A couple of Spark and Scala functionalities that can be picked up in this post:

  • how to create SparkSession
  • how to create SparkContext
  • auxiliary constructors
  • submitting Spark application with arguments
  • error handling in Spark
  • adding column to Spark DataFrame

My operating system is Windows 10 with Spark 2.4.0 installed on it. Spark has been installed using this tutorial. I have not installed Scala on my machine. I do have Java, version 8. Keep in mind Spark does not support Java 9. It is ok to have multiple versions installed, just make sure you make the switch to Java 8 in the IDE.

My IDE of choice is Intellij IDEA Community 2019.1. It is possible to run the code test, import libraries (using sbt), package JAR file and run the JAR from the IDE.

I have a small single node Spark cluster in AWS (one instance type t2.micro) for testing the JAR file outside of my development environment. This instance is accessed, and the project tested from mobaXterm.

Create application

Create new Scala project

create project

Type in name of the project and change the JDK path to Java 8 if default points to some other version. Spark 2.4.0 is using Scala 2.11.12 so make sure the Scala version matches. This can be changed later in the sbt file.

create project_2

IntelliJ IDEA creates the project and the structure of it is as below image:

project structure

Spark libraries are imported using the sbt file. Copy and paste the following line to the sbt file:

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
// https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3"

In the lower right corner will there will be a pop-up window asking you if you want to import changes. Accept that.

import changes

Reading files in Spark

A rookie mistake is to create a file locally, run it in the development environment, which is your own computer, where it works as it should, and then scale-up to a cluster where the file cannot be found. It is easy to forget that the file should be in the same location on all workers in the Spark cluster not just on the instance that servers as the client!

To avoid this, an external storage is introduced. In the case of this post, it is the object storage of AWS – S3.

The test file I am using here is a simple one-line txt file:

a;b;c

If you plan to use other file structure make sure to change the schema definition in the code.

Creating Scala class

The environment, input file and the sbt file are now ready. Let us write the example. Create new Scala class.

file new project.jpg

Make sure to choose Object from the Kind dropdown menu.

new scala class

Open Test.scala and copy the following content in it:

import scala.util.Failure
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.functions.lit

// primary constructor
class TextAnalysis(path : String, comment : String) {

//aux cons takes no inputs
def this() {
  this("s3a://hdp-hive-s3/test.txt", "no params")
}

//aux cons takes file path as input
def this(path : String) {
  this(path, "from s3a")
}

  //create spark session
  private val spark = SparkSession
    .builder
    .master("local")
    .appName("kaggle")
    .getOrCreate()

  //Create a SparkContext to initialize Spark
  private val sc = spark.sparkContext

  def ReadFile(): Unit = {

    val testSchema = StructType(Array(
      StructField("first", StringType, true),
      StructField("second", StringType, true),
      StructField("third", StringType, true)))

    try {

      val df = spark.read
        .format("csv")
        .option("delimiter", ";")
        .schema(testSchema)
        .load(path)

      val df1 = df.withColumn("comment", lit(comment))
      df1.show()

    } catch {
      case e: AnalysisException =&amp;gt; {
        println(s"FILE $path NOT FOUND... EXITING...")
      }
      case unknown: Exception =&amp;gt; {
        println("UNKNOWN EXCEPTION... EXITING...")
        println(Failure(unknown))
      }
    }
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    val sep = ";"
    val argsArray = args.mkString(sep).split(sep)

    val ta = new TextAnalysis("s3a://hdp-hive-s3/test.txt")
    ta.ReadFile()

    val ta1 = new TextAnalysis(argsArray(0), argsArray(1))
    ta1.ReadFile()

    val ta2 = new TextAnalysis()
    ta2.ReadFile()
  }
}

In the object Test, three instances of the TextAnalysis class are created to demonstrate how to run  Spark code with parameters as input arguments and without.

In order to make the second instance work the arguments need to be defined. Click Run -> Edit Configurations…

In textbox Program arguments write the following “s3a://hdp-hive-s3/test.txt;with args”. Click OK.

Right click on the code and click on the Run “Test” from the menu. This should generate a verbose log which should also give 3 prints of the test tables. Here is an example of one

table output example

The column “comment” is added in the Spark the code.

Package application in a JAR

Create artifact configuration

Open the Project Structure windows and choose Artifacts from the left menu: File -> Project Structure -> Artifacts.

Click on the +, choose JAR and From modules with dependencies.

Enter Module and Main Class manually or use the menus on the right. Click OK when done and click OK again to exit the window.

create jar from modules

Build JAR

Open the menu Build -> Build Artifacts. Choose the JAR you wish to build and choose action Build.

build artifact - build.JPG

This will create a new folder called “out” where the JAR file resides.

out folder

The test.jar is over 130 MB big. And 70 lines of code were written. All dependencies were packaged in this JAR file which can sometimes be acceptable but in majority of cases the dependencies are already on the server. And the same goes for this case. The JARs used to run this application are already inside the standard Spark cluster under $SPARK_HOME/jars. For working with S3 files the JAR file hadoop-aws should be added to the jars folder. The JAR can be found here.

Removing the dependent JARs is done in the following way from Build -> Build Artifacts.

build artifact - edit.JPG

In the window, on the right side of it, remove all the JAR files under the test.jar – mark them and click icon “minus”.

project structure - removed dependencies

After saving the changes, rebuild the artifact using Build -> Build Artifacts and the Rebuild option from the menu. This should reduce the file size to 4 KB.

Testing the package on Spark single-node

If you have a working Spark cluster you can copy the JAR file to it and test it. I have a single node Spark cluster in AWS for this purpose and mobaXterm to connect to the cluster. From the folder where the JAR file has been copied run the following command:

sh $SPARK_HOME/bin/spark-submit --class Test test.jar "s3a://hdp-hive-s3/test.txt;test on Spark"

Three tables should be seen in the output, among the log outputs, one of the tables is the following

table output on Spark cluster

This table shows how calling a Scala class with arguments from the command line works.

This wraps up the example of how Spark, Scala, sbt and S3 work together.