Wednesday, August 7, 2013

Understanding Complex Event Processing (CEP) Operators with WSO2 CEP (Siddhi)

CEP model have many sensors. A sensor can be a real sensor (e.g. temperature sensor), some agent, or a system that support instrumentation. Sensor sends events to CEP and each event has several name value properties.

We call events coming from the same sensor as a “stream” and give it a name. When an interesting event occurs, the sensor sends that event to the stream.

To use a stream, you need to first define them.

 define stream PizzaOrders (id string, price float, ts long, custid string)

CEP listens to one or more streams, and we can write queries telling the CEP to look for certain conditions. For writing queries, you can use following constructs.
  1. Filters 
  2. Windows 
  3. Joins 
  4. Patterns and Sequences 
  5. Event tables 
  6. Partitions 
Let us see what we can do with each construct.

Filter checks a condition about property in an event. It can be a =, >, < etc., and you can create complex queries by combing multiple conditions via and, or, not etc.

Following query detect pizza orders that are small and placed too far from the store.

 select from PizzaOrders[price <= 20 and distance>1km]  
 insert into NBNOrders id, price, distance  


An event stream can have an infinite number of events. Windows are a way to select a subset events for further processing. You can select events in many ways: events came in a time period, last N events etc.

Output from a window is set of events. You can use it for further processing (e.g. joining event streams) or calculate aggregate function like sum and average.

We can either get output to be triggered when all events are collected or whenever a new event is added. We call the first type batch windows and second sliding windows.

For example, window can collect all pizza orders placed in the last hour and emit the average value of the order once every hour.

 from PizzaOrders#window.time( 1h ) into HourlyOrderStats avg(price) as avgPrice  

Join operator join two event streams. Idea is to match event coming from two streams and create a new event stream.

For example, you can use join operator to join PizzaDelivery stream and PizzaOrder stream and calculate the time took to deliver each order.

 from PizzaOrder#window.time(1h) as o join PizzaDelivery as d  
   on ==  
 insert into DeliveryTime as id, d.ts-0.ts as ts  

At least one side of the join must have a window. For example, in above example, we can have a one hour window for PizzaOrder (because delivery always happens after the order) where join will store the events coming in PizzaOrder for one hour and match them against delivery events. If you have two windows, join will store events at each stream and match them against events coming to the other stream.

Patterns and Sequences 

Patterns and sequences let us match conditions that happen over time.

For example, we can use patterns to identify returning customers using following query. Here -> denotes followed by relationship.

 from every a1 = PizzaOder  
     -> a2 = PizzaOder[custid=a1.custid]  
 insert into ReturningCustomers  
    a1.custid as custid a2.ts as ts  

Patterns match even when there are other events in between two matching conditions. Sequences are similar, but provided event sequence must exactly match the events that happened. For example, following is the same query implemented using sequences. Note here second line is to ignore any not matching events.

 from every a1 = PizzaOder,  
      a2 = PizzaOder[custid=a1.custid]  
 insert into ReturningCustomers  
    a1.custid as custid a2.ts as ts  

Here instead of -> relationship we use regular expression like notation to define sequence of conditions.

Partitions (available in upcoming 3.0 release)

Siddhi evaluates a query matching all the events in event streams used by that query. Partitions let us partition events into several groups based on some condition before evaluating queries.

For example, let say we need to find the time spent until pizza left shop and until it is delivered. We can first partition pizza orders by orderID and then evaluate the query. It simplifies the query by great extent.

define partition oderParition by, PizzaDone.oid, PizzaDelivered.oid   
select from PizzaOder as o ->PizzaDone as p -> PizzaDelivered as d 
insert into OrderTimes (p.ts-o.ts) as time2Preprae, (d.ts-p.ts) as time2Delivery 
   partition by oderParition  

We do this for several reasons.
  1. Evaluating events separately within several partitions might be faster than matching them all together. In the later case, we match events only within the partition.
  2. Sometime it makes queries easier to design. For example, in the above query, partitioning let us write a query without worrying about other orders that are overlapped with the same order.
  3. Partitions let CEP runtime to distribute evaluation to multiple machines, and this can helps when scaling queries.
Event Tables (available in upcoming 3.0 release)

Event tables let you remember some events and use them later. You can define a table just like a stream. 

 define table LatePizzaOrdersTable (ordered string, ts long, price float);

Then you can add events to it, delete events from it, and join those events in the table against incoming events.

For example, lets say we need to store all late deliveries and if late delivery happend to the same customer twice we want to give them free pizza.

 from LatePizzaDeliveries insert into LatePizzaOrdersTable; 

Then we can join events from event table with incoming events as follows.

from LatePizzaDeliveries as l join LatePizzaOrdersTable as t 
    on l.custid=t.custid AND l.ts!=t.ts
insert into FreePizzaOrders
You can also do the same using an event stream. However, event tables can be written to the disk and very useful for the long running usecases. For example, if we do the above using an event stream stored values will be lost when we restart the server. However, values in event tables will be preserved in a disk.

Friday, August 2, 2013

CEP Performance: Processing 100k to Millions of events per second using WSO2 Complex Event Processing (CEP) Server

With WSO2 CEP, you can use SQL style queries to detect interesting patterns across many data streams. We call the standalone version of the CEP as Siddhi, and that is what you need to use if you need to embed CEP engine within a java program. On the other hand, WSO2 CEP provides CEP query support running as a server, and you can send events using Thrift, Web Services, REST calls, JMS, and emails.

WSO2 CEP can handle few 100k events over the network and few million events within the JVM. We had done and publish those numbers before. In this post, I will try to put all together and give context into different numbers.

In the following, event includes multiple properties and queries matches those events against given conditions. 

Same JVM Events performance

 Setup: We used Intel(R) Xeon(R) X3440 @2.53GHz , 4 cores 8M cache 8GB RAM running Debian 2.6.32-5-amd64 Kernel.  We genereted events from the same JVM.

Case 1: Simple filler (from StockTick[prize >6] return  symbol, prize)

Case 2:  Window (From StickTick[symbol=‘IBM’]#win.time(0.005)  return  symbol, avg(prize))

Case 3: Events patterns A->B (A followed by B). 

From f=FraudWarningEvent ->
return accountNumber;

Performance Over the Network

Setup: We used Intel® Core™ i7-2630QM CPU @ 2.00GHz, 8 cores, 8GB RAM running Ubnthu 12.04, 3.2.0-32-generic Kernel, for running CEP and used Intel® Core™ i3-2350M CPU @ 2.30GHz, 4 cores, 4GB RAM running Ubnthu 12.04, 3.2.0-32-generic Kernel, for the three client nodes.

Following results are for a simple filter, we sent events over the network using thrift. 


Performance For a Complex Scenario

Finally following is the performance for DEBS grand challenge. Grand challenge detect following scenarios from the event generated from a real football game.

Usecase 1: Running analysis

The first usecase measures each player’s running speeds and calculates how long he spent on different speed ranges. For example, results will tell that the player "Martin" is running fast from the time 27 minutes and 01 second of the game to 27 minute and 35 second of the game.

Usecase 2 & 4: Ball Possession and Shots on Goal

 For the second use case, we need to calculate the time each player controlled the ball (ball possession). A player controls the ball from the time he hit the ball until someone has hit the ball, ball goes out of the ground, or game is stopped. We identify hits when a ball is within one meter of a player and its acceleration increases by more than 55ms-2.

The usecase four is to detect hits and emit events if the ball is going to the goal.

Usecase 3: Heatmap of Activity

Usecase three divides the ground to a grid, and calculate the time a player spends on each cell. However, this usecase needs updates once every second. First part we can solve just like with the first usecase, but to make sure we get an event once a second, we had to couple it with a timer.

You can find more information from , and you can find the queries from this blog post

Setup: VM with 4 cores (@2.8 GHz), 4 GB RAM, SSD HDD, and 1GB Ethernet, and we replayed events from the same JVM.