Thursday, May 30, 2013

Introduction to Big Data

Following is the introduction to Big Data slide deck I presented at Colombo Java meetup this month. You can find the slides from Introduction to Big Data



Here I am taking a detailed look at the analysis, looking beyond the basic analytics and basic figures. We do analysis for three reasons.
  1. To know what happened (Analytics)
  2. To  explain what happened (Models, theories)
  3. To predict what will happen (Neural networks, Numerical models )
Also the slides discuss much about existing predications like weather we are doing very well. It also includes a typical big data architecture and discusses different parts of that architecture. 

Finally, the slides goes on to discuss Hadoop and CEP and provide some of the examples.  

Monday, May 13, 2013

How to profile WSO2 Products


We use Profiling to find performance bottlenecks. We use tools like JProfiler and Yourkit profiler to profile java programs. However, if you try to use one of those tools to profile a WSO2 products, there are few things you should know.

I have repeated this many times, and that is the reason I am writing this down.
  1. Start the WSO2 Product
  2. Connect to the product by finding the Java processor from the menu. If you are debugging from a remote machine, you need to start the product with the JVM options (e.g. with jprofiler, the command look like -agentpath:/Applications/jprofiler7/bin/macos/libjprofilerti.jnilib=port=8849).
  3. When it connects, use instrumentation (you can sampling only if you run the test for 3-4 hours or more).
  4. Generally default filters will include org.apache.*, which will remove most useful information. To avoid that, remove all default filters, and add inclusive filters for org.wso2, org.apache, and java.util.
  5. Enable CPU or other useful data recording options, and continue with your profiling. 

Finally, if you have a memory leak, it is good idea to get a memory dump and to analyze it using EclipseMemoryAnalyzer and look at “Leak Suspect Report”.

Wednesday, May 1, 2013

Solving DEBS 2013 Grand Challenge with WSO2 CEP/Siddhi

ACM International Conference on Distributed Event-Based Systems (DEBS) is one of the primary venues for Complex Event processing related research. Every year, they provide a grand challenge that challenges the participants to solve an event-based problem.

This year we are also competing, and following are some of our experiences while doing it.

This year grand challenges is to process events generated in a football game and answer four queries related to the game in streaming fashion. Both the player’s shoes and ball had sensors, and as the game continues, it generates about 15,000 events per second. Each event includes the location (x,y,z), time stamp, velocity and acceleration of the sensor. You can find more information from http://www.orgs.ttu.edu/debs2013/index.php?goto=cfchallengedetails. Complete dataset is 49 million events.

There are four usecases. I will explain each and tell bit about how we implemented them. To understand the queries, you need to know the Siddhi Event query language. Following is a crash course.

from <name of the event stream1>[condition on event], 
    <name of the event stream2>[condition on event], 
    <name of the event stream3>[condition on event] …
select <values to extracted from matching events>
insert into <name of the event stream to send results>



Here the comma separated conditions define an event sequence. That is the query matches when those conditions have matched in the given order by the incoming events.

We worked with two streams: Players – events from players and BallStream – events from the ball.

Usecase 1: Running analysis 

The first usecase measures each player’s running speeds and calculates how long he spent on different speed ranges. For example, results will tell that the player "Martin" is running fast from the time 27 minutes and 01 second of the game to 27 minute and 35 second of the game.

We have implemented this usecase by using CEP event sequences to detect whenever a player crossed a threshold of event speeds using a query like following.

define partition player by Players .id
from s = Players [v <= 1 or v > 11] , 
    t = Players [v > 1 and v <= 11]+ , e = Players [v <= 1 or v > 11]
select s.ts as tsStart , e.ts as tsStop ,s.id as playerId , 
    ‘‘trot" as  intensity , t [0].v as   instantSpeed , 
    (e.ts - s.ts )/1000000000 as  unitPeriod
insert   into   RunningStats  partition by  player

Query define sequence of conditions. For example,  Players [v <= 1 or v > 11] means velocity range in events received in player event stream. Siddhi invokes the callback when those conditions are met.

Here “+” point to one or more occurrences similar to regular expressions. You can find more information about Siddhi language from Siddhi Language Specification. We wrote a query for each speed range, found the time stamps of speed range changes, and calculated the final result.

Here event partitions partition the events by some condition before processing it. Above query partition events by the player.

Usecase 2 & 4: Ball Possession and Shots on Goal 

 For the second use case, we need to calculate the time each player controlled the ball (ball possession). A player controls the ball from the time he hit the ball until someone has hit the ball, ball goes out of the ground, or game is stopped. We identify hits when a ball is within one meter of a player and its acceleration increases by more than 55ms-2.

The usecase four is to detect hits and emit events if the ball is going to the goal.

We implemented both using event sequences, and following are the queries. Here fuctions like debs:isGoingToGoal(..) and debs:getDistance(..) are java custom functions we wrote and registered with Siddhi.

 Following query detects the hits.

from Ball#window.length(1) as b join 
 Players\#window.length(1)  as p unidirectional
 on debs:getDistance(b.x,b.y,b.z,
  p.x, p.y, p.z) < 1000 and b.a > 55
select p.sid, p.ts, p.x, p.y, p.z, p.a, p.pid,p.tid, 
 b.sid as ballSid  
insert into hitStream


Following query detects the event streams while a player is in control of the ball. 



from old = hitCountStream, 
 b = hitCountStream[old.pid != pid ], 
 n=hitCountStream[b.pid == pid]*, 
 e1 = hitCountStream[b.pid != pid ] 
  or e2=ballLeavingHitStream 
select b.pid as playerId, b.tid as teamId, 
 b.ts as startTs,  
 coalesce(e1.ts ,e2.ts) as endTs, 
 coalesce(e1.cnt ,e2.cnt) as counter1, 
 b.cnt as counter2 
insert into BallPossessionStream

following query detects hits at the goal.

from s = hitStream, 
 t = Ball[debs:getDistance(s.x,s.y,s.z, 
  x, y, z) < 1000]+, 
 e = Ball[debs:isGoingToGoal(s.tid,x,y,z
  ,a,v,vx,vy,vz,s.ts,ts) == 1.0]+, 
 h = Ball[debs:isGoingToGoal(s.tid,x,y,z,
  a,v,vx,vy,vz,s.ts,ts) == 0.0] 
   or l=ballLeavingStream
select s.ts as startTime, e[0].ts, s.pid, 
 s.tid, 'on' as mode "
insert into ShotOnGoalStream

Usecase 3: Heatmap of Activity 

Usecase three divides the ground to a grid, and calculate the time a player spends on each cell. However, this usecase needs updates once every second. First part we can solve just like with the first usecase, but to make sure we get an event once a second, we had to couple it with a timer.

The queries are given below, and you can find the explanation from the paper. Following query detects when a player has changed the cell.

from r=PlayersCell, 
 s = PlayersCell[cell!=r.cell] 
select r.pid as playerId, s.ts as time, s.x, s.y, 
 s.cell as newCell,r.cell as oldCell,  
 s.tsms as timems, 'ball' as type 
insert into r4CellChanges partition by player
Following query join the events with 1 second timer.
from timer1s unidirectional 
 join PlayersCell\#window.unique(pid) as c 
select c.pid as playerId, timer.ts as time, c.x, 
 c.y, c.cell as newCell, c.cell as oldCell,  
 timer.tsms as timems, 'timer' as type 
insert into r4CellChanges


Following query use results from above two queries and calculate the time spent on each cell. 



from s=region4cellChange, 
 e=region4cellChange[s.newCell==oldCell] 
select s.playerId, e.timems as endTimems,
 e.time as ts, s.x, s.y, 
 e.time- s.time as time, 
 s.newCell as cell, e.type 
insert into r4CellStay partition by player

We were able to get about 50,000 events per second for usecase 3 and more than 100000 event per second with other usecases on a 4 core, 4G VMware virtual machine. This is about three times the required rate of 15000 events, which we believe to be impressive. Also this shows that Siddhi EQL can naturally fit such a complex scenario into queries, and they provide a nice higher level abstraction to think about the problem.

Hope this was interesting. Implementing the actual scenarios included more queries to calculate all the details,  but above presents the basic idea.

If you like to try out WSO2 CEP/ Siddhi you can find it from http://wso2.com/products/complex-event-processor/.

You can find an introduction to Siddhi from http://srinathsview.blogspot.com/2011/12/siddhi-second-look-at-complex-event.html