How Apache Storm is used in Twitter

Last updated on May 30 2022
Lalit Kolgaonkar

Table of Contents

How Apache Storm is used in Twitter

Apache Storm in Twitter

 

Here during this blog , we’ll discuss a real-time application of Apache Storm. we’ll see how Storm is employed in Twitter.

Twitter

Twitter is a web social networking service that gives a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is employed to categorize tweets by keyword by appending # before the relevant keyword. Now allow us to take a real-time scenario of finding the foremost used hashtag per topic.

Spout Creation

The purpose of spout is to urge the tweets submitted by people as soon as possible. Twitter provides “Twitter Streaming API”, an internet service-based tool to retrieve the tweets submitted by people in real time. Twitter Streaming API are often accessed in any programing language.

twitter4j is an open source, unofficial Java library, which provides a Java based module to simply access the Twitter Streaming API. twitter4j provides a listener-based framework to access the tweets. To access the Twitter Streaming API, we’d like to check in for Twitter developer account and will get the subsequent OAuth authentication details.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm provides a twitter spout, TwitterSampleSpout, in its starter kit. We will be using it to retrieve the tweets. The spout needs OAuth authentication details and at least a keyword. The spout will emit real-time tweets based on keywords. The complete program code is given below.

Coding: TwitterSampleSpout.java

import java.util.Map;

import java.util.concurrent.LinkedBlockingQueue;

 

import twitter4j.FilterQuery;

import twitter4j.StallWarning;

import twitter4j.Status;

import twitter4j.StatusDeletionNotice;

import twitter4j.StatusListener;

 

import twitter4j.TwitterStream;

import twitter4j.TwitterStreamFactory;

import twitter4j.auth.AccessToken;

import twitter4j.conf.ConfigurationBuilder;

 

import backtype.storm.Config;

import backtype.storm.spout.SpoutOutputCollector;

 

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

import backtype.storm.utils.Utils;

 

@SuppressWarnings(“serial”)

public class TwitterSampleSpout extends BaseRichSpout {

SpoutOutputCollector _collector;

LinkedBlockingQueue<Status> queue = null;

TwitterStream _twitterStream;

 

String consumerKey;

String consumerSecret;

String accessToken;

String accessTokenSecret;

String[] keyWords;

 

public TwitterSampleSpout(String consumerKey, String consumerSecret,

String accessToken, String accessTokenSecret, String[] keyWords) {

this.consumerKey = consumerKey;

this.consumerSecret = consumerSecret;

this.accessToken = accessToken;

this.accessTokenSecret = accessTokenSecret;

this.keyWords = keyWords;

}

 

public TwitterSampleSpout() {

// TODO Auto-generated constructor stub

}

 

@Override

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

queue = new LinkedBlockingQueue<Status>(1000);

_collector = collector;

StatusListener listener = new StatusListener() {

@Override

public void onStatus(Status status) {

queue.offer(status);

}

 

@Override

public void onDeletionNotice(StatusDeletionNotice sdn) {}

 

@Override

public void onTrackLimitationNotice(int i) {}

 

@Override

public void onScrubGeo(long l, long l1) {}

 

@Override

public void onException(Exception ex) {}

 

@Override

public void onStallWarning(StallWarning arg0) {

// TODO Auto-generated method stub

}

};

 

ConfigurationBuilder cb = new ConfigurationBuilder();

 

cb.setDebugEnabled(true)

.setOAuthConsumerKey(consumerKey)

.setOAuthConsumerSecret(consumerSecret)

.setOAuthAccessToken(accessToken)

.setOAuthAccessTokenSecret(accessTokenSecret);

 

_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

_twitterStream.addListener(listener);

 

if (keyWords.length == 0) {

_twitterStream.sample();

}else {

FilterQuery query = new FilterQuery().track(keyWords);

_twitterStream.filter(query);

}

}

 

@Override

public void nextTuple() {

Status ret = queue.poll();

 

if (ret == null) {

Utils.sleep(50);

} else {

_collector.emit(new Values(ret));

}

}

 

@Override

public void close() {

_twitterStream.shutdown();

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

Config ret = new Config();

ret.setMaxTaskParallelism(1);

return ret;

}

 

@Override

public void ack(Object id) {}

 

@Override

public void fail(Object id) {}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“tweet”));

}

}

Hashtag Reader Bolt

The tweet emitted by spout will be forwarded to HashtagReaderBolt, which will process the tweet and emit all the available hashtags. HashtagReaderBolt uses getHashTagEntities method provided by twitter4j. getHashTagEntities reads the tweet and returns the list of hashtag. The complete program code is as follows −

Coding: HashtagReaderBolt.java

import java.util.HashMap;

import java.util.Map;

 

import twitter4j.*;

import twitter4j.conf.*;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

 

public class HashtagReaderBolt implements IRichBolt {

private OutputCollector collector;

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this.collector = collector;

}

 

@Override

public void execute(Tuple tuple) {

Status tweet = (Status) tuple.getValueByField(“tweet”);

for(HashtagEntity hashtage : tweet.getHashtagEntities()) {

System.out.println(“Hashtag: ” + hashtage.getText());

this.collector.emit(new Values(hashtage.getText()));

}

}

 

@Override

public void cleanup() {}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“hashtag”));

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

return null;

}

 

}

Hashtag Counter Bolt

The emitted hashtag will be forwarded to HashtagCounterBolt. This bolt will process all the hashtags and save each and every hashtag and its count in memory using Java Map object. The complete program code is given below.

Coding: HashtagCounterBolt.java

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

 

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

 

public class HashtagCounterBolt implements IRichBolt {

Map<String, Integer> counterMap;

private OutputCollector collector;

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this.counterMap = new HashMap<String, Integer>();

this.collector = collector;

}

 

@Override

public void execute(Tuple tuple) {

String key = tuple.getString(0);

 

if(!counterMap.containsKey(key)){

counterMap.put(key, 1);

}else{

Integer c = counterMap.get(key) + 1;

counterMap.put(key, c);

}

 

collector.ack(tuple);

}

 

@Override

public void cleanup() {

for(Map.Entry<String, Integer> entry:counterMap.entrySet()){

System.out.println(“Result: ” + entry.getKey()+” : ” + entry.getValue());

}

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields(“hashtag”));

}

 

@Override

public Map<String, Object> getComponentConfiguration() {

return null;

}

 

}

Submitting a Topology

Submitting a topology is the main application. Twitter topology consists of TwitterSampleSpout, HashtagReaderBolt, and HashtagCounterBolt. The following program code shows how to submit a topology.

Coding: TwitterHashtagStorm.java

import java.util.*;

 

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

 

public class TwitterHashtagStorm {

public static void main(String[] args) throws Exception{

String consumerKey = args[0];

String consumerSecret = args[1];

 

String accessToken = args[2];

String accessTokenSecret = args[3];

 

String[] arguments = args.clone();

String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);

 

Config config = new Config();

config.setDebug(true);

 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(“twitter-spout”, new TwitterSampleSpout(consumerKey,

consumerSecret, accessToken, accessTokenSecret, keyWords));

 

builder.setBolt(“twitter-hashtag-reader-bolt”, new HashtagReaderBolt())

.shuffleGrouping(“twitter-spout”);

 

builder.setBolt(“twitter-hashtag-counter-bolt”, new HashtagCounterBolt())

.fieldsGrouping(“twitter-hashtag-reader-bolt”, new Fields(“hashtag”));

 

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(“TwitterHashtagStorm”, config,

builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();

}

}

Building and Running the Application

The complete application has four Java codes. They are as follows −

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

You can compile the application using the following command −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

Execute the application using the following commands −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.

TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>

<keyword1> <keyword2> … <keywordN>

Output

The application will print the current available hashtag and its count. The output should be similar to the following −

Result: jazztastic : 1

Result: foodie : 1

Result: Redskins : 1

Result: Recipe : 1

Result: cook : 1

Result: android : 1

Result: food : 2

Result: NoToxicHorseMeat : 1

Result: Purrs4Peace : 1

Result: livemusic : 1

Result: VIPremium : 1

Result: Frome : 1

Result: SundayRoast : 1

Result: Millennials : 1

Result: HealthWithKier : 1

Result: LPs30DaysofGratitude : 1

Result: cooking : 1

Result: gameinsight : 1

Result: Countryfile : 1

Result: androidgames : 1

 

So, this brings us to the end of blog. This Tecklearn ‘How Apache Storm is used in Twitter’ helps you with commonly asked questions if you are looking out for a job in Apache Storm and Big Data Domain.

If you wish to learn Apache Storm and build a career in Apache Storm or Big Data domain, then check out our interactive, Apace Storm Training, that comes with 24*7 support to guide you throughout your learning period. Please find the link for course details:

https://www.tecklearn.com/course/apache-strom-training/

Apache Storm Training

About the Course

Tecklearn Apache Storm training will give you a working knowledge of the open-source computational engine, Apache Storm. You will be able to do distributed real-time data processing and come up with valuable insights. You will learn about the deployment and development of Apache Storm applications in real world for handling Big Data and implementing various analytical tools for powerful enterprise-grade solutions. Upon completion of this online training, you will hold a solid understanding and hands-on experience with Apache Storm.

Why Should you take Apache Storm Training?

  • The average pay of Apache Storm Professional stands at $90,167 P.A – ​Indeed.com​​
  • Groupon, Twitter and many companies using Apache Storm for business purposes like real-time analytics and micro-batch processing.
  • Apache Storm is a free and open source, distributed real-time computation system for processing fast, large streams of data

What you will Learn in this Course?

Introduction to Apache Storm

  • Apache Storm
  • Apache Storm Data Model

Architecture of Storm

  • Apache Storm Architecture
  • Hadoop distributed computing
  • Apache Storm features

Installation and Configuration

  • Pre-requisites for Installation
  • Installation and Configuration

Storm UI

  • Zookeeper
  • Storm UI

Storm Topology Patterns

Got a question for us? Please mention it in the comments section and we will get back to you.

0 responses on "How Apache Storm is used in Twitter"

Leave a Message

Your email address will not be published. Required fields are marked *