Streaming with Storm – simple example with HDFS bolt

Topology

The topology file is where the main method is and the topology file imports the spout file. In the topology file, the HDFS bolt is defined and the code for writing to HDFS resides here.
In the ./src/main/java/org.package create a file RandomWordsTopology.java and place the following code in it

package org.package;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import org.package.spout.RandomWordSpout;

public class RandomWordsTopology {
    public static void main(String[] args) throws Exception {

        //It is a comma separated value file (although only one word per line is written)
		RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

		//Rotate files after 127MB (Hortonworks default fileblock size is 128MB)
		FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(127.0f, FileSizeRotationPolicy.Units.MB);

        DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat();
        //The files are written in this HDFS folder
		fileNameFormat.withPath("/storm-data");
		//Files start with the following filename prefix
        fileNameFormat.withPrefix("RandomWords-");
		//Files end with the following suffix
        fileNameFormat.withExtension(".csv");

        //HDFS bolt
        HdfsBolt bolt =
            new HdfsBolt().withFsUrl("hdfs://namenode:8020")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);

        TopologyBuilder builder = new TopologyBuilder();

        //Spout for accessing random words with parallelism of 1
        builder.setSpout("random-words-spout", new RandomWordSpout(), 1);

        //bolt for writing text to HDFS  with parallelism of 1 - it writes into 1 file
        builder.setBolt("hdfs-csv-bolt", bolt, 1).shuffleGrouping("random-words-spout");

        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(3);

		//Submit topology and choose its name
        StormSubmitter.submitTopology("RandomWordsHdfsTopology", conf, builder.createTopology());
    }
}

Package it to see if it builds successfully.

mvn package

If everything went as it should, you can submit the topology. Go to the next page.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s