How to implement Mobile Call log Analyzer using Apache Storm

Last updated on May 30 2022
Lalit Kolgaonkar

Table of Contents

How to implement Mobile Call log Analyzer using Apache Storm

Apache Storm – Working Example

We have gone through the core technical details of the Apache Storm and now it is time to code some simple scenarios.

Scenario – Mobile Call Log Analyzer

Mobile call and its duration are going to be given as input to Apache Storm and therefore the Storm will process and group the decision between an equivalent caller and receiver and their total number of calls.

Spout Creation

Spout may be a component which is employed for data generation. Basically, a spout will implement an IRichSpout interface. “IRichSpout” interface has the subsequent important methods −

  • open − Provides the spout with an environment to execute. The executors will run this method to initialize the spout.
  • nextTuple − Emits the generated data through the collector.
  • close − This method is named when a spout goes to shutdown.
  • declareOutputFields − Declares the output schema of the tuple.
  • ack − Acknowledges that a selected tuple is processed
  • fail − Specifies that a selected tuple isn’t processed and to not be reprocessed.

Open

The signature of the open method is as follows −

open(Map conf, TopologyContext context, SpoutOutputCollector collector)

  • conf − Provides storm configuration for this spout.
  • context − Provides complete information about the spout place within the topology, its task id, input and output information.
  • collector − Enables us to emit the tuple that will be processed by the bolts.

nextTuple

The signature of the nextTuple method is as follows −

nextTuple()

 

nextTuple() is named periodically from an equivalent loop because the ack() and fail() methods. It must release control of the thread when there’s no work to try to , in order that the opposite methods have an opportunity to be called. therefore the first line of nextTuple checks to ascertain if processing has finished. If so, it should sleep for a minimum of one millisecond to scale back load on the processor before returning.

close

The signature of the close method is as follows −

close()

declareOutputFields

The signature of the declareOutputFields method is as follows −

declareOutputFields(OutputFieldsDeclarer declarer)

declarer − It is used to declare output stream ids, output fields, etc.

This method is used to specify the output schema of the tuple.

ack

The signature of the ack method is as follows −

ack(Object msgId)

This method acknowledges that a specific tuple has been processed.

fail

The signature of the nextTuple method is as follows −

ack(Object msgId)

This method informs that a specific tuple has not been fully processed. Storm will reprocess the specific tuple.

FakeCallLogReaderSpout

In our scenario, we need to collect the call log details. The information of the call log contains.

  • caller number
  • receiver number
  • duration

Since, we don’t have real-time information of call logs, we will generate fake call logs. The fake information will be created using Random class. The complete program code is given below.

Coding − FakeCallLogReaderSpout.java

import java.util.*;

//import storm tuple packages

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

//import Spout interface packages

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

 

//Create a class FakeLogReaderSpout which implement IRichSpout interface

to access functionalities

 

public class FakeCallLogReaderSpout implements IRichSpout {

//Create instance for SpoutOutputCollector which passes tuples to bolt.

private SpoutOutputCollector collector;

private boolean completed = false;

 

//Create instance for TopologyContext which contains topology data.

private TopologyContext context;

 

//Create instance for Random class.

private Random randomGenerator = new Random();

private Integer idx = 0;

 

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

this.context = context;

this.collector = collector;

}

 

@Override

public void nextTuple() {

if(this.idx <= 1000) {

List<String> mobileNumbers = new ArrayList<String>();

mobileNumbers.add(“1234123401”);

mobileNumbers.add(“1234123402”);

mobileNumbers.add(“1234123403”);

mobileNumbers.add(“1234123404”);

 

Integer localIdx = 0;

while(localIdx++ < 100 && this.idx++ < 1000) {

String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

 

while(fromMobileNumber == toMobileNumber) {

toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

}

 

Integer duration = randomGenerator.nextInt(60);

this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));

}

}

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“from”, “to”, “duration”));

}

 

//Override all the interface methods

@Override

public void close() {}

 

public boolean isDistributed() {

return false;

}

 

@Override

public void activate() {}

 

@Override

public void deactivate() {}

 

@Override

public void ack(Object msgId) {}

 

@Override

public void fail(Object msgId) {}

 

@Override

public Map<String, Object> getComponentConfiguration() {

return null;

}

}

Bolt Creation

Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes CallLogCreatorBolt and CallLogCounterBolt are used to perform the operations.

IRichBolt interface has the following methods −

  • prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
  • execute − Process a single tuple of input.
  • cleanup − Called when a bolt is going to shutdown.
  • declareOutputFields − Declares the output schema of the tuple.

Prepare

The signature of the prepare method is as follows −

prepare(Map conf, TopologyContext context, OutputCollector collector)

  • conf − Provides Storm configuration for this bolt.
  • context − Provides complete information about the bolt place within the topology, its task id, input and output information, etc.
  • collector − Enables us to emit the processed tuple.

execute

The signature of the execute method is as follows −

execute(Tuple tuple)

Here tuple is the input tuple to be processed.

The execute method processes a single tuple at a time. The tuple data can be accessed by getValue method of Tuple class. It is not necessary to process the input tuple immediately. Multiple tuple can be processed and output as a single output tuple. The processed tuple can be emitted by using the OutputCollector class.

cleanup

The signature of the cleanup method is as follows −

cleanup()

declareOutputFields

The signature of the declareOutputFields method is as follows −

declareOutputFields(OutputFieldsDeclarer declarer)

Here the parameter declarer is used to declare output stream ids, output fields, etc.

This method is used to specify the output schema of the tuple

Call log Creator Bolt

Call log creator bolt receives the call log tuple. The call log tuple has caller number, receiver number, and call duration. This bolt simply creates a new value by combining the caller number and the receiver number. The format of the new value is “Caller number – Receiver number” and it is named as new field, “call”. The complete code is given below.

Coding − CallLogCreatorBolt.java

//import util packages

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

 

//import Storm IRichBolt package

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

 

//Create a class CallLogCreatorBolt which implement IRichBolt interface

public class CallLogCreatorBolt implements IRichBolt {

//Create instance for OutputCollector which collects and emits tuples to produce output

private OutputCollector collector;

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this.collector = collector;

}

 

@Override

public void execute(Tuple tuple) {

String from = tuple.getString(0);

String to = tuple.getString(1);

Integer duration = tuple.getInteger(2);

collector.emit(new Values(from + ” – ” + to, duration));

}

 

@Override

public void cleanup() {}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“call”, “duration”));

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

return null;

}

}

Call log Counter Bolt

Call log counter bolt receives call and its duration as a tuple. This bolt initializes a dictionary (Map) object in the prepare method. In execute method, it checks the tuple and creates a new entry in the dictionary object for every new “call” value in the tuple and sets a value 1 in the dictionary object. For the already available entry in the dictionary, it just increment its value. In simple terms, this bolt saves the call and its count in the dictionary object. Instead of saving the call and its count in the dictionary, we can also save it to a datasource. The complete program code is as follows −

Coding − CallLogCounterBolt.java

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

 

public class CallLogCounterBolt implements IRichBolt {

Map<String, Integer> counterMap;

private OutputCollector collector;

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this.counterMap = new HashMap<String, Integer>();

this.collector = collector;

}

 

@Override

public void execute(Tuple tuple) {

String call = tuple.getString(0);

Integer duration = tuple.getInteger(1);

 

if(!counterMap.containsKey(call)){

counterMap.put(call, 1);

}else{

Integer c = counterMap.get(call) + 1;

counterMap.put(call, c);

}

 

collector.ack(tuple);

}

 

@Override

public void cleanup() {

for(Map.Entry<String, Integer> entry:counterMap.entrySet()){

System.out.println(entry.getKey()+” : ” + entry.getValue());

}

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“call”));

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

return null;

}

 

}

Creating Topology

The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −

TopologyBuilder builder = new TopologyBuilder();

 

builder.setSpout(“call-log-reader-spout”, new FakeCallLogReaderSpout());

 

builder.setBolt(“call-log-creator-bolt”, new CallLogCreatorBolt())

.shuffleGrouping(“call-log-reader-spout”);

 

builder.setBolt(“call-log-counter-bolt”, new CallLogCounterBolt())

.fieldsGrouping(“call-log-creator-bolt”, new Fields(“call”));

shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.

Local Cluster

For development purpose, we can create a local cluster using “LocalCluster” object and then submit the topology using “submitTopology” method of “LocalCluster” class. One of the arguments for “submitTopology” is an instance of “Config” class. The “Config” class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method. Once topology is submitted to the cluster, we will wait 10 seconds for the cluster to compute the submitted topology and then shutdown the cluster using “shutdown” method of “LocalCluster”. The complete program code is as follows −

Coding − LogAnalyserStorm.java

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

//import storm configuration packages

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

 

//Create main class LogAnalyserStorm submit topology.

public class LogAnalyserStorm {

public static void main(String[] args) throws Exception{

//Create Config instance for cluster configuration

Config config = new Config();

config.setDebug(true);

 

//

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(“call-log-reader-spout”, new FakeCallLogReaderSpout());

 

builder.setBolt(“call-log-creator-bolt”, new CallLogCreatorBolt())

.shuffleGrouping(“call-log-reader-spout”);

 

builder.setBolt(“call-log-counter-bolt”, new CallLogCounterBolt())

.fieldsGrouping(“call-log-creator-bolt”, new Fields(“call”));

 

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(“LogAnalyserStorm”, config, builder.createTopology());

Thread.sleep(10000);

 

//Stop the topology

 

cluster.shutdown();

}

}

Building and Running the Application

The complete application has four Java codes. They are −

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

The application can be built using the following command −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can be run using the following command −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

Once the application is started, it will output the complete details about the cluster startup process, spout and bolt processing, and finally, the cluster shutdown process. In “CallLogCounterBolt”, we have printed the call and its count details. This information will be displayed on the console as follows −

1234123402 – 1234123401 : 78

1234123402 – 1234123404 : 88

1234123402 – 1234123403 : 105

1234123401 – 1234123404 : 74

1234123401 – 1234123403 : 81

1234123401 – 1234123402 : 81

1234123403 – 1234123404 : 86

1234123404 – 1234123401 : 63

1234123404 – 1234123402 : 82

1234123403 – 1234123402 : 83

1234123404 – 1234123403 : 86

1234123403 – 1234123401 : 93

Non-JVM languages

Storm topologies are implemented by Thrift interfaces which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let’s take a look at python binding.

Python Binding

Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.

As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.

public static class WordCount implements IRichBolt {

public WordSplit() {

super(“python”, “splitword.py”);

}

 

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“word”));

}

}

Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument “splitword.py”. Now create a python implementation named “splitword.py”.

import storm

class WordCountBolt(storm.BasicBolt):

def process(self, tup):

words = tup.values[0].split(” “)

for word in words:

storm.emit([word])

WordCountBolt().run()

This is the sample implementation for Python that counts the words in a given sentence. Similarly you can bind with other supporting languages as well.

So, this brings us to the end of blog. This Tecklearn ‘How to implement Mobile Call log Analyzer using Apache Storm’ helps you with commonly asked questions if you are looking out for a job in Apache Storm and Big Data Domain.

If you wish to learn Apache Storm and build a career in Apache Storm or Big Data domain, then check out our interactive, Apace Storm Training, that comes with 24*7 support to guide you throughout your learning period. Please find the link for course details:

https://www.tecklearn.com/course/apache-strom-training/

Apache Storm Training

About the Course

Tecklearn Apache Storm training will give you a working knowledge of the open-source computational engine, Apache Storm. You will be able to do distributed real-time data processing and come up with valuable insights. You will learn about the deployment and development of Apache Storm applications in real world for handling Big Data and implementing various analytical tools for powerful enterprise-grade solutions. Upon completion of this online training, you will hold a solid understanding and hands-on experience with Apache Storm.

Why Should you take Apache Storm Training?

  • The average pay of Apache Storm Professional stands at $90,167 P.A – ​Indeed.com​​
  • Groupon, Twitter and many companies using Apache Storm for business purposes like real-time analytics and micro-batch processing.
  • Apache Storm is a free and open source, distributed real-time computation system for processing fast, large streams of data

What you will Learn in this Course?

Introduction to Apache Storm

  • Apache Storm
  • Apache Storm Data Model

Architecture of Storm

  • Apache Storm Architecture
  • Hadoop distributed computing
  • Apache Storm features

Installation and Configuration

  • Pre-requisites for Installation
  • Installation and Configuration

Storm UI

  • Zookeeper
  • Storm UI

Storm Topology Patterns

Got a question for us? Please mention it in the comments section and we will get back to you.

0 responses on "How to implement Mobile Call log Analyzer using Apache Storm"

Leave a Message

Your email address will not be published. Required fields are marked *