Topology
The topology file is where the main method is and the topology file imports the spout file. In the topology file, the HDFS bolt is defined and the code for writing to HDFS resides here.
In the ./src/main/java/org.package create a file RandomWordsTopology.java and place the following code in it
package org.package;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.package.spout.RandomWordSpout;
public class RandomWordsTopology {
public static void main(String[] args) throws Exception {
//It is a comma separated value file (although only one word per line is written)
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
//Rotate files after 127MB (Hortonworks default fileblock size is 128MB)
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(127.0f, FileSizeRotationPolicy.Units.MB);
DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat();
//The files are written in this HDFS folder
fileNameFormat.withPath("/storm-data");
//Files start with the following filename prefix
fileNameFormat.withPrefix("RandomWords-");
//Files end with the following suffix
fileNameFormat.withExtension(".csv");
//HDFS bolt
HdfsBolt bolt =
new HdfsBolt().withFsUrl("hdfs://namenode:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
//Spout for accessing random words with parallelism of 1
builder.setSpout("random-words-spout", new RandomWordSpout(), 1);
//bolt for writing text to HDFS with parallelism of 1 - it writes into 1 file
builder.setBolt("hdfs-csv-bolt", bolt, 1).shuffleGrouping("random-words-spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(3);
//Submit topology and choose its name
StormSubmitter.submitTopology("RandomWordsHdfsTopology", conf, builder.createTopology());
}
}
Package it to see if it builds successfully.
mvn package
If everything went as it should, you can submit the topology. Go to the next page.