Friday, August 2, 2013

CEP Performance: Processing 100k to Millions of events per second using WSO2 Complex Event Processing (CEP) Server


With WSO2 CEP, you can use SQL style queries to detect interesting patterns across many data streams. We call the standalone version of the CEP as Siddhi, and that is what you need to use if you need to embed CEP engine within a java program. On the other hand, WSO2 CEP provides CEP query support running as a server, and you can send events using Thrift, Web Services, REST calls, JMS, and emails.

WSO2 CEP can handle few 100k events over the network and few million events within the JVM. We had done and publish those numbers before. In this post, I will try to put all together and give context into different numbers.

In the following, event includes multiple properties and queries matches those events against given conditions. 

Same JVM Events performance

 Setup: We used Intel(R) Xeon(R) X3440 @2.53GHz , 4 cores 8M cache 8GB RAM running Debian 2.6.32-5-amd64 Kernel.  We genereted events from the same JVM.

Case 1: Simple filler (from StockTick[prize >6] return  symbol, prize)





Case 2:  Window (From StickTick[symbol=‘IBM’]#win.time(0.005)  return  symbol, avg(prize))



Case 3: Events patterns A->B (A followed by B). 

From f=FraudWarningEvent ->
p=PINChangeEvent(accountNumber=f.accountNumber) 
return accountNumber;




Performance Over the Network

Setup: We used Intel® Core™ i7-2630QM CPU @ 2.00GHz, 8 cores, 8GB RAM running Ubnthu 12.04, 3.2.0-32-generic Kernel, for running CEP and used Intel® Core™ i3-2350M CPU @ 2.30GHz, 4 cores, 4GB RAM running Ubnthu 12.04, 3.2.0-32-generic Kernel, for the three client nodes.

Following results are for a simple filter, we sent events over the network using thrift. 

           


Performance For a Complex Scenario


Finally following is the performance for DEBS grand challenge. Grand challenge detect following scenarios from the event generated from a real football game.

Usecase 1: Running analysis

The first usecase measures each player’s running speeds and calculates how long he spent on different speed ranges. For example, results will tell that the player "Martin" is running fast from the time 27 minutes and 01 second of the game to 27 minute and 35 second of the game.

Usecase 2 & 4: Ball Possession and Shots on Goal

 For the second use case, we need to calculate the time each player controlled the ball (ball possession). A player controls the ball from the time he hit the ball until someone has hit the ball, ball goes out of the ground, or game is stopped. We identify hits when a ball is within one meter of a player and its acceleration increases by more than 55ms-2.

The usecase four is to detect hits and emit events if the ball is going to the goal.

Usecase 3: Heatmap of Activity

Usecase three divides the ground to a grid, and calculate the time a player spends on each cell. However, this usecase needs updates once every second. First part we can solve just like with the first usecase, but to make sure we get an event once a second, we had to couple it with a timer.

You can find more information from http://www.orgs.ttu.edu/debs2013/index.php?goto=cfchallengedetails , and you can find the queries from this blog post

Setup: VM with 4 cores (@2.8 GHz), 4 GB RAM, SSD HDD, and 1GB Ethernet, and we replayed events from the same JVM. 

 

Thursday, June 6, 2013

Instant MapReduce Patterns book



I have been working on a micro book (50 pages) on Instant MapReduce Patterns, which is available now both online and in printed from. (You can also buy it from Amazon).

It is a short book, that explains 10 simple MapReduce application patterns. The book starts from the ground zero, introducing MapReduce. Then it goes on to MapReduce patterns.

The book aims to get reader beyond the word count (the hello world equivalent in the Map Reduce world) and give them a concrete feel into writing MapReduce programs. There are three introductory level recipes, and then it describes seven patterns of MapReduce programs: analytics, set operations, cross correlation, search, graph, Joins, and clustering with an example on each pattern.

Hope it will be useful, I will blog later about sample recipes.

Also if you are looking for a detailed treatment, our earlier book "Hadoop Mapreduce Cookbook" might worth a look. 

Thursday, May 30, 2013

Introduction to Big Data

Following is the introduction to Big Data slide deck I presented at Colombo Java meetup this month. You can find the slides from Introduction to Big Data



Here I am taking a detailed look at the analysis, looking beyond the basic analytics and basic figures. We do analysis for three reasons.
  1. To know what happened (Analytics)
  2. To  explain what happened (Models, theories)
  3. To predict what will happen (Neural networks, Numerical models )
Also the slides discuss much about existing predications like weather we are doing very well. It also includes a typical big data architecture and discusses different parts of that architecture. 

Finally, the slides goes on to discuss Hadoop and CEP and provide some of the examples.  

Monday, May 13, 2013

How to profile WSO2 Products


We use Profiling to find performance bottlenecks. We use tools like JProfiler and Yourkit profiler to profile java programs. However, if you try to use one of those tools to profile a WSO2 products, there are few things you should know.

I have repeated this many times, and that is the reason I am writing this down.
  1. Start the WSO2 Product
  2. Connect to the product by finding the Java processor from the menu. If you are debugging from a remote machine, you need to start the product with the JVM options (e.g. with jprofiler, the command look like -agentpath:/Applications/jprofiler7/bin/macos/libjprofilerti.jnilib=port=8849).
  3. When it connects, use instrumentation (you can sampling only if you run the test for 3-4 hours or more).
  4. Generally default filters will include org.apache.*, which will remove most useful information. To avoid that, remove all default filters, and add inclusive filters for org.wso2, org.apache, and java.util.
  5. Enable CPU or other useful data recording options, and continue with your profiling. 

Finally, if you have a memory leak, it is good idea to get a memory dump and to analyze it using EclipseMemoryAnalyzer and look at “Leak Suspect Report”.

Wednesday, May 1, 2013

Solving DEBS 2013 Grand Challenge with WSO2 CEP/Siddhi

ACM International Conference on Distributed Event-Based Systems (DEBS) is one of the primary venues for Complex Event processing related research. Every year, they provide a grand challenge that challenges the participants to solve an event-based problem.

This year we are also competing, and following are some of our experiences while doing it.

This year grand challenges is to process events generated in a football game and answer four queries related to the game in streaming fashion. Both the player’s shoes and ball had sensors, and as the game continues, it generates about 15,000 events per second. Each event includes the location (x,y,z), time stamp, velocity and acceleration of the sensor. You can find more information from http://www.orgs.ttu.edu/debs2013/index.php?goto=cfchallengedetails. Complete dataset is 49 million events.

There are four usecases. I will explain each and tell bit about how we implemented them. To understand the queries, you need to know the Siddhi Event query language. Following is a crash course.

from <name of the event stream1>[condition on event], 
    <name of the event stream2>[condition on event], 
    <name of the event stream3>[condition on event] …
select <values to extracted from matching events>
insert into <name of the event stream to send results>



Here the comma separated conditions define an event sequence. That is the query matches when those conditions have matched in the given order by the incoming events.

We worked with two streams: Players – events from players and BallStream – events from the ball.

Usecase 1: Running analysis 

The first usecase measures each player’s running speeds and calculates how long he spent on different speed ranges. For example, results will tell that the player "Martin" is running fast from the time 27 minutes and 01 second of the game to 27 minute and 35 second of the game.

We have implemented this usecase by using CEP event sequences to detect whenever a player crossed a threshold of event speeds using a query like following.

define partition player by Players .id
from s = Players [v <= 1 or v > 11] , 
    t = Players [v > 1 and v <= 11]+ , e = Players [v <= 1 or v > 11]
select s.ts as tsStart , e.ts as tsStop ,s.id as playerId , 
    ‘‘trot" as  intensity , t [0].v as   instantSpeed , 
    (e.ts - s.ts )/1000000000 as  unitPeriod
insert   into   RunningStats  partition by  player

Query define sequence of conditions. For example,  Players [v <= 1 or v > 11] means velocity range in events received in player event stream. Siddhi invokes the callback when those conditions are met.

Here “+” point to one or more occurrences similar to regular expressions. You can find more information about Siddhi language from Siddhi Language Specification. We wrote a query for each speed range, found the time stamps of speed range changes, and calculated the final result.

Here event partitions partition the events by some condition before processing it. Above query partition events by the player.

Usecase 2 & 4: Ball Possession and Shots on Goal 

 For the second use case, we need to calculate the time each player controlled the ball (ball possession). A player controls the ball from the time he hit the ball until someone has hit the ball, ball goes out of the ground, or game is stopped. We identify hits when a ball is within one meter of a player and its acceleration increases by more than 55ms-2.

The usecase four is to detect hits and emit events if the ball is going to the goal.

We implemented both using event sequences, and following are the queries. Here fuctions like debs:isGoingToGoal(..) and debs:getDistance(..) are java custom functions we wrote and registered with Siddhi.

 Following query detects the hits.

from Ball#window.length(1) as b join 
 Players\#window.length(1)  as p unidirectional
 on debs:getDistance(b.x,b.y,b.z,
  p.x, p.y, p.z) < 1000 and b.a > 55
select p.sid, p.ts, p.x, p.y, p.z, p.a, p.pid,p.tid, 
 b.sid as ballSid  
insert into hitStream


Following query detects the event streams while a player is in control of the ball. 



from old = hitCountStream, 
 b = hitCountStream[old.pid != pid ], 
 n=hitCountStream[b.pid == pid]*, 
 e1 = hitCountStream[b.pid != pid ] 
  or e2=ballLeavingHitStream 
select b.pid as playerId, b.tid as teamId, 
 b.ts as startTs,  
 coalesce(e1.ts ,e2.ts) as endTs, 
 coalesce(e1.cnt ,e2.cnt) as counter1, 
 b.cnt as counter2 
insert into BallPossessionStream

following query detects hits at the goal.

from s = hitStream, 
 t = Ball[debs:getDistance(s.x,s.y,s.z, 
  x, y, z) < 1000]+, 
 e = Ball[debs:isGoingToGoal(s.tid,x,y,z
  ,a,v,vx,vy,vz,s.ts,ts) == 1.0]+, 
 h = Ball[debs:isGoingToGoal(s.tid,x,y,z,
  a,v,vx,vy,vz,s.ts,ts) == 0.0] 
   or l=ballLeavingStream
select s.ts as startTime, e[0].ts, s.pid, 
 s.tid, 'on' as mode "
insert into ShotOnGoalStream

Usecase 3: Heatmap of Activity 

Usecase three divides the ground to a grid, and calculate the time a player spends on each cell. However, this usecase needs updates once every second. First part we can solve just like with the first usecase, but to make sure we get an event once a second, we had to couple it with a timer.

The queries are given below, and you can find the explanation from the paper. Following query detects when a player has changed the cell.

from r=PlayersCell, 
 s = PlayersCell[cell!=r.cell] 
select r.pid as playerId, s.ts as time, s.x, s.y, 
 s.cell as newCell,r.cell as oldCell,  
 s.tsms as timems, 'ball' as type 
insert into r4CellChanges partition by player
Following query join the events with 1 second timer.
from timer1s unidirectional 
 join PlayersCell\#window.unique(pid) as c 
select c.pid as playerId, timer.ts as time, c.x, 
 c.y, c.cell as newCell, c.cell as oldCell,  
 timer.tsms as timems, 'timer' as type 
insert into r4CellChanges


Following query use results from above two queries and calculate the time spent on each cell. 



from s=region4cellChange, 
 e=region4cellChange[s.newCell==oldCell] 
select s.playerId, e.timems as endTimems,
 e.time as ts, s.x, s.y, 
 e.time- s.time as time, 
 s.newCell as cell, e.type 
insert into r4CellStay partition by player

We were able to get about 50,000 events per second for usecase 3 and more than 100000 event per second with other usecases on a 4 core, 4G VMware virtual machine. This is about three times the required rate of 15000 events, which we believe to be impressive. Also this shows that Siddhi EQL can naturally fit such a complex scenario into queries, and they provide a nice higher level abstraction to think about the problem.

Hope this was interesting. Implementing the actual scenarios included more queries to calculate all the details,  but above presents the basic idea.

If you like to try out WSO2 CEP/ Siddhi you can find it from http://wso2.com/products/complex-event-processor/.

You can find an introduction to Siddhi from http://srinathsview.blogspot.com/2011/12/siddhi-second-look-at-complex-event.html 

Friday, February 22, 2013

What is messaging and why should you care?


I drew the following diagram for wso2con talk. Through the diagram, I am trying to explain “what is messaging” and “why should you care”? Here are some thoughts.



When we build systems, often we use some form of RPC: like UNIX RPC, RMI, Web Services and Thrift etc. Yes, each has its unique traits, yet fundamentally all let users invoke a procedure call living in a remote server and has three properties.
  1. Request/response (a.k.a. two way messaging)
  2. Synchronous (client wait for the server to return)
  3. Transient (message is not stored)
Although, we often take these properties as granted, there is a flip side to each. For example, there are one-way interactions, asynchronous interactions, and persistent interactions.

As shown by the figure, those properties can be combined into eight possibilities, and RPC is just one of the eight. We use messaging to mean a superset of RPC interactions that allow users the flexibility of choosing above properties. Messaging systems (e.g. JMS) support Distributed Queues (see [2]) and Publish/Subscribe (see [1]) models.

Messaging may provide all or some of the following advantages.
  1. Reliability – via transactions or persistence 
  2. Performance – enable us to do SEDA like event driven non-blocking architecture that can provide higher throughput than blocking architectures. 
  3. Loose coupling – participants joining the interaction may loosely coupled in one of the following three dimensions (read the paper “Many faces of Publish Subscribe” for more information. )
    • Space – server and client do not need to know each other.
    • Time – server and client do not have to be online at the same time.
    • Synchronization –client does not have to wait for the message to be delivered.
References
  1. Eugster, Patrick Th, et al. "The many faces of publish/subscribe." ACM Computing Surveys (CSUR) 35.2 (2003): 114-131.
  2. Scalable Persistent Message Brokering with WSO2 Message Broker slide deck
.

Wednesday, January 30, 2013

Our book, "Hadoop MapReduce Cookbook" is online



Myself and Thilina has been working on a Book on Hadoop, and it is now online! You can find the book from http://www.packtpub.com/hadoop-mapreduce-cookbook/book or from Amazon at http://www.amazon.com/Hadoop-MapReduce-Cookbook-ebook/dp/B00B71KZRE/. It is available under both paperback as well as in the e-book format. 

Hadoop is an implementation of the MapReduce pattern first introduced by Goolge in their seminal paper MapReduce: Simplified Data Processing on Large Clusters . It provide a programming model for users to process large dataset using many computers. 

For example, let us consider there are few giga bytes of log files that contains access logs for a server. If you want to read those log files and count the number of hits received by each web page in the server. It is possible for the user to write a program that walks though the log file and process them. However, if the log files are large, users would need to process the log files using many computers. Writing a system that process such log files using many computers would be a significant undertaking. 

However, toolkits like Hadoop that support MapReduce framework would let users to write two functions called "map" and "reduce", and the framework will take care of the details of processing the log files. Furthermore, Hadoop will handle details like communication between nodes, scheduling sub-tasks, handling failures, and debugging. 

Log processing is only a trivial examples that can be implemented with MapReduce paradigm. It can and it is being used to implement many simple and complex data processing tasks around the world. Users can extend in many ways to handle different message formats. 

The book starts with simple introductory level details, but goes into many map reduce patterns like analytics, clustering, and recommendations etc. Each one is explained using a recipe and accompanied with code samples. We believe it has recipe that would help beginners as well as experienced MapReduce developers. 

Some important information. 
  1. Book uses Hadoop 1.0.x releases 
  2. Table of content is at http://www.amazon.com/Hadoop-MapReduce-Cookbook-ebook/dp/B00B71KZRE/ (click on the image)
  3. The sample chapter that talks about Analytics using MapReduce from https://www.packtpub.com/sites/default/files/9781849517287_Chapter_06.pdf  


This finishes about a year long process of writing editing, and re-editing. It took lot of time, but it was of course a great experience. I would like to thank the editorial team for all the help and feedback. We hope the book will be be useful for Hadoop developers. 

Enjoy!!