Deep Dive into Trident – an extension of Apache Storm

Last updated on May 30 2022
Lalit Kolgaonkar

Table of Contents

Deep Dive into Trident – an extension of Apache Storm

Apache Storm – Trident

 

Trident is an extension of Storm. Like Storm, Trident was also developed by Twitter. the most reason behind developing Trident is to supply a high-level abstraction on top of Storm alongside stateful stream processing and low latency distributed querying.

Trident uses spout and bolt, but these low-level components are auto-generated by Trident before execution. Trident has functions, filters, joins, grouping, and aggregation.

Trident processes streams as a series of batches which are referred as transactions. Generally, the dimensions of these small batches are going to be on the order of thousands or many tuples, counting on the input stream. This way, Trident is different from Storm, which performs tuple-by-tuple processing.

Batch processing concept is extremely almost like database transactions. Every transaction is assigned a transaction ID. The transaction is taken into account successful, once all its processing complete. However, a failure in processing one among the transaction’s tuples will cause the whole transaction to be retransmitted. for every batch, Trident will call beginCommit at the start of the transaction, and commit at the top of it.

Trident Topology

Trident API exposes a simple choice to create Trident topology using “TridentTopology” class. Basically, Trident topology receives input stream from spout and do ordered sequence of operation (filter, aggregation, grouping, etc.,) on the stream. Storm Tuple is replaced by Trident Tuple and Bolts are replaced by operations. an easy Trident topology are often created as follow −

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident tuple may be a named list of values. The TridentTuple interface is that the data model of a Trident topology. The TridentTuple interface is that the basic unit of data which will be processed by a Trident topology.

Trident Spout

Trident spout is analogous to Storm spout, with additional options to use the features of Trident. Actually, we will still use the IRichSpout, which we’ve utilized in Storm topology, but it’ll be non-transactional in nature and that we won’t be ready to use the benefits provided by Trident.

The basic spout having all the functionality to use the features of Trident is “ITridentSpout”. It supports both transactional and opaque transactional semantics. the opposite spouts are IBatchSpout, IPartitionedTridentSpout, and IOpaquePartitionedTridentSpout.

In addition to those generic spouts, Trident has many samples implementation of trident spout. one among them is FeederBatchSpout spout, which we will use to send named list of trident tuples easily without fear about execution, parallelism, etc.

FeederBatchSpout creation and data feeding can be done as shown below −

TridentTopology topology = new TridentTopology();

FeederBatchSpout testSpout = new FeederBatchSpout(

ImmutableList.of(“fromMobileNumber”, “toMobileNumber”, “duration”));

topology.newStream(“fixed-batch-spout”, testSpout)

testSpout.feed(ImmutableList.of(new Values(“1234123401”, “1234123402”, 20)));

Trident Operations

Trident relies on the “Trident Operation” to process the input stream of trident tuples. Trident API features a number of in-built operations to handle simple-to-complex stream processing. These operations range from simple validation to complex grouping and aggregation of trident tuples. allow us to undergo the foremost important and regularly used operations.

Filter

Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation −

public class MyFilter extends BaseFilter {

public boolean isKeep(TridentTuple tuple) {

return tuple.getInteger(1) % 2 == 0;

}

}

 

input

 

[1, 2]

[1, 3]

[1, 4]

 

output

 

[1, 2]

[1, 4]

Filter function can be called in the topology using “each” method. “Fields” class can be used to specify the input (subset of trident tuple). The sample code is as follows −

TridentTopology topology = new TridentTopology();

topology.newStream(“spout”, spout)

.each(new Fields(“a”, “b”), new MyFilter())

Function

Function is an object used to perform a simple operation on a single trident tuple. It takes a subset of trident tuple fields and emits zero or more new trident tuple fields.

Function basically inherits from the BaseFunction class and implements the execute method. A sample implementation is given below −

public class MyFunction extends BaseFunction {

public void execute(TridentTuple tuple, TridentCollector collector) {

int a = tuple.getInteger(0);

int b = tuple.getInteger(1);

collector.emit(new Values(a + b));

}

}

 

input

 

[1, 2]

[1, 3]

[1, 4]

 

output

 

[1, 2, 3]

[1, 3, 4]

[1, 4, 5]

Just like Filter operation, Function operation can be called in a topology using the each method. The sample code is as follows −

TridentTopology topology = new TridentTopology();

topology.newStream(“spout”, spout)

.each(new Fields(“a, b”), new MyFunction(), new Fields(“d”)));

Aggregation

Aggregation is an object wont to perform aggregation operations on an input batch or partition or stream. Trident has three sorts of aggregation. they’re as follows −

  • aggregate − Aggregates each batch of trident tuple in isolation. During the mixture process, the tuples are initially repartitioned using the worldwide grouping to mix all partitions of an equivalent batch into one partition.
  • partitionAggregate − Aggregates each partition rather than the whole batch of trident tuple. The output of the partition aggregate completely replaces the input tuple. The output of the partition aggregate contains one field tuple.
  • persistentaggregate − Aggregates on all trident tuple across all batch and stores the end in either memory or database.

TridentTopology topology = new TridentTopology();

 

// aggregate operation

topology.newStream(“spout”, spout)

.each(new Fields(“a, b”), new MyFunction(), new Fields(“d”))

.aggregate(new Count(), new Fields(“count”))

 

// partitionAggregate operation

topology.newStream(“spout”, spout)

.each(new Fields(“a, b”), new MyFunction(), new Fields(“d”))

.partitionAggregate(new Count(), new Fields(“count”))

 

// persistentAggregate – saving the count to memory

topology.newStream(“spout”, spout)

.each(new Fields(“a, b”), new MyFunction(), new Fields(“d”))

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields(“count”));

Aggregation operation can be created using either CombinerAggregator, ReducerAggregator, or generic Aggregator interface. The “count” aggregator used in the above example is one of the build-in aggregators. It is implemented using “CombinerAggregator”. The implementation is as follows −

public class Count implements CombinerAggregator<Long> {

@Override

public Long init(TridentTuple tuple) {

return 1L;

}

 

@Override

public Long combine(Long val1, Long val2) {

return val1 + val2;

}

 

@Override

public Long zero() {

return 0L;

}

}

Grouping

Grouping operation is an inbuilt operation and may be called by the groupBy method. The groupBy method repartitions the stream by doing a partitionBy on the required fields, then within each partition, it groups tuples together whose group fields are equal. Normally, we use “groupBy” along side “persistentAggregate” to urge the grouped aggregation. The sample code is as follows −

TridentTopology topology = new TridentTopology();

 

// persistentAggregate – saving the count to memory

topology.newStream(“spout”, spout)

.each(new Fields(“a, b”), new MyFunction(), new Fields(“d”))

.groupBy(new Fields(“d”)

.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields(“count”));

Merging and Joining

Merging and joining are often done by using “merge” and “join” method respectively. Merging combines one or more streams. Joining is analogous to merging, except the very fact that joining uses trident tuple field from each side to see and join two streams. Moreover, joining will work under batch level only. The sample code is as follows −

TridentTopology topology = new TridentTopology();

topology.merge(stream1, stream2, stream3);

topology.join(stream1, new Fields(“key”), stream2, new Fields(“x”),

new Fields(“key”, “a”, “b”, “c”));

State Maintenance

Trident provides a mechanism for state maintenance. State information are often stored within the topology itself, otherwise you’ll store it during a separate database also. the rationale is to take care of a state that if any tuple fails during processing, then the failed tuple is retried. This creates a drag while updating the state because you’re unsure whether the state of this tuple has been updated previously or not. If the tuple has failed before updating the state, then retrying the tuple will make the state stable. However, if the tuple has failed after updating the state, then retrying an equivalent tuple will again increase the count within the database and make the state unstable. One must perform the subsequent steps to make sure a message is processed just one occasion −

  •  Process the tuples in small batches.
  • Assign a singular ID to every batch. If the batch is retried, it’s given an equivalent unique ID.
  • The state updates are ordered among batches. for instance, the state update of the second batch won’t be possible until the state update for the primary batch has completed

Distributed RPC

Distributed RPC is employed to question and retrieve the result from the Trident topology. Storm has an inbuilt distributed RPC server. The distributed RPC server receives the RPC request from the client and passes it to the topology. The topology processes the request and sends the result to the distributed RPC server, which is redirected by the distributed RPC server to the client. Trident’s distributed RPC query executes sort of a normal RPC query, apart from the very fact that these queries are run in parallel.

When to Use Trident?

As in many use-cases, if the need is to process a question just one occasion, we will achieve it by writing a topology in Trident. On the opposite hand, it’ll be difficult to realize exactly once processing within the case of Storm. Hence Trident are going to be useful for those use-cases where you need exactly once processing. Trident isn’t for all use cases, especially high-performance use-cases because it adds complexity to Storm and manages the state.

Working Example of Trident

We are getting to convert our call log analyzer application figured out within the previous section to Trident framework. Trident application is going to be relatively easy as compared to plain storm, because of its high-level API. Storm are going to be basically required to perform anybody of Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally, we’ll start the DRPC Server using the LocalDRPC class and search some keyword using the execute method of LocalDRPC class.

Formatting the call information

The purpose of the FormatCall class is to format the call information comprising “Caller number” and “Receiver number”. The complete program code is as follows −

Coding: FormatCall.java

import backtype.storm.tuple.Values;

 

import storm.trident.operation.BaseFunction;

import storm.trident.operation.TridentCollector;

import storm.trident.tuple.TridentTuple;

 

public class FormatCall extends BaseFunction {

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

String fromMobileNumber = tuple.getString(0);

String toMobileNumber = tuple.getString(1);

collector.emit(new Values(fromMobileNumber + ” – ” + toMobileNumber));

}

}

CSVSplit

The purpose of the CSVSplit class is to split the input string based on “comma (,)” and emit every word in the string. This function is used to parse the input argument of distributed querying. The complete code is as follows −

Coding: CSVSplit.java

import backtype.storm.tuple.Values;

 

import storm.trident.operation.BaseFunction;

import storm.trident.operation.TridentCollector;

import storm.trident.tuple.TridentTuple;

 

public class CSVSplit extends BaseFunction {

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

for(String word: tuple.getString(0).split(“,”)) {

if(word.length() > 0) {

collector.emit(new Values(word));

}

}

}

}

Log Analyzer

This is the main application. Initially, the application will initialize the TridentTopology and feed caller information using FeederBatchSpout. Trident topology stream can be created using the newStream method of TridentTopology class. Similarly, Trident topology DRPC stream can be created using the newDRCPStream method of TridentTopology class. A simple DRCP server can be created using LocalDRPC class. LocalDRPC has execute method to search some keyword. The complete code is given below.

Coding: LogAnalyserTrident.java

import java.util.*;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.utils.DRPCClient;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

import storm.trident.TridentState;

import storm.trident.TridentTopology;

import storm.trident.tuple.TridentTuple;

 

import storm.trident.operation.builtin.FilterNull;

import storm.trident.operation.builtin.Count;

import storm.trident.operation.builtin.Sum;

import storm.trident.operation.builtin.MapGet;

import storm.trident.operation.builtin.Debug;

import storm.trident.operation.BaseFilter;

 

import storm.trident.testing.FixedBatchSpout;

import storm.trident.testing.FeederBatchSpout;

import storm.trident.testing.Split;

import storm.trident.testing.MemoryMapState;

 

import com.google.common.collect.ImmutableList;

 

public class LogAnalyserTrident {

public static void main(String[] args) throws Exception {

System.out.println(“Log Analyser Trident”);

TridentTopology topology = new TridentTopology();

 

FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of(“fromMobileNumber”,

“toMobileNumber”, “duration”));

 

TridentState callCounts = topology

.newStream(“fixed-batch-spout”, testSpout)

.each(new Fields(“fromMobileNumber”, “toMobileNumber”),

new FormatCall(), new Fields(“call”))

.groupBy(new Fields(“call”))

.persistentAggregate(new MemoryMapState.Factory(), new Count(),

new Fields(“count”));

 

LocalDRPC drpc = new LocalDRPC();

 

topology.newDRPCStream(“call_count”, drpc)

.stateQuery(callCounts, new Fields(“args”), new MapGet(), new Fields(“count”));

 

topology.newDRPCStream(“multiple_call_count”, drpc)

.each(new Fields(“args”), new CSVSplit(), new Fields(“call”))

.groupBy(new Fields(“call”))

.stateQuery(callCounts, new Fields(“call”), new MapGet(),

new Fields(“count”))

.each(new Fields(“call”, “count”), new Debug())

.each(new Fields(“count”), new FilterNull())

.aggregate(new Fields(“count”), new Sum(), new Fields(“sum”));

 

Config conf = new Config();

LocalCluster cluster = new LocalCluster();

cluster.submitTopology(“trident”, conf, topology.build());

Random randomGenerator = new Random();

int idx = 0;

 

while(idx < 10) {

testSpout.feed(ImmutableList.of(new Values(“1234123401”,

“1234123402”, randomGenerator.nextInt(60))));

 

testSpout.feed(ImmutableList.of(new Values(“1234123401”,

“1234123403”, randomGenerator.nextInt(60))));

 

testSpout.feed(ImmutableList.of(new Values(“1234123401”,

“1234123404”, randomGenerator.nextInt(60))));

 

testSpout.feed(ImmutableList.of(new Values(“1234123402”,

“1234123403”, randomGenerator.nextInt(60))));

 

idx = idx + 1;

}

 

System.out.println(“DRPC : Query starts”);

System.out.println(drpc.execute(“call_count”,”1234123401 – 1234123402″));

System.out.println(drpc.execute(“multiple_call_count”, “1234123401 –

1234123402,1234123401 – 1234123403”));

System.out.println(“DRPC : Query ends”);

 

cluster.shutdown();

drpc.shutdown();

 

// DRPCClient client = new DRPCClient(“drpc.server.location”, 3772);

}

}

Building and Running the Application

The complete application has three Java codes. They are as follows −

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

The application can be built by using the following command −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can be run by using the following command −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

Once the application is started, the application will output the complete details about the cluster startup process, operations processing, DRPC Server and client information, and finally, the cluster shutdown process. This output will be displayed on the console as shown below.

DRPC : Query starts

[[“1234123401 – 1234123402”,10]]

DEBUG: [1234123401 – 1234123402, 10]

DEBUG: [1234123401 – 1234123403, 10]

[[20]]

DRPC : Query ends

 

So, this brings us to the end of blog. This Tecklearn ‘Deep Dive into Trident (an extension of ) Apache Storm’ helps you with commonly asked questions if you are looking out for a job in Apache Storm and Big Data Domain.

If you wish to learn Apache Storm and build a career in Apache Storm or Big Data domain, then check out our interactive, Apace Storm Training, that comes with 24*7 support to guide you throughout your learning period. Please find the link for course details:

https://www.tecklearn.com/course/apache-strom-training/

Apache Storm Training

About the Course

Tecklearn Apache Storm training will give you a working knowledge of the open-source computational engine, Apache Storm. You will be able to do distributed real-time data processing and come up with valuable insights. You will learn about the deployment and development of Apache Storm applications in real world for handling Big Data and implementing various analytical tools for powerful enterprise-grade solutions. Upon completion of this online training, you will hold a solid understanding and hands-on experience with Apache Storm.

Why Should you take Apache Storm Training?

  • The average pay of Apache Storm Professional stands at $90,167 P.A – ​Indeed.com​​
  • Groupon, Twitter and many companies using Apache Storm for business purposes like real-time analytics and micro-batch processing.
  • Apache Storm is a free and open source, distributed real-time computation system for processing fast, large streams of data

What you will Learn in this Course?

Introduction to Apache Storm

  • Apache Storm
  • Apache Storm Data Model

Architecture of Storm

  • Apache Storm Architecture
  • Hadoop distributed computing
  • Apache Storm features

Installation and Configuration

  • Pre-requisites for Installation
  • Installation and Configuration

Storm UI

  • Zookeeper
  • Storm UI

Storm Topology Patterns

Got a question for us? Please mention it in the comments section and we will get back to you.

0 responses on "Deep Dive into Trident – an extension of Apache Storm"

Leave a Message

Your email address will not be published. Required fields are marked *