Thursday, July 24, 2014

Handling Large Scale CEP Usecase with WSO2 CEP

I have been explaining the topic too many times in last few days and decided to write this down. I had written down my thoughts on the topic earlier on the post How to scale Complex Event Processing? This posts covers how to do those on WSO2 CEP and what will be added in upcoming WSO2 CEP 4.0 release. 
Also I will refine the classification also a bit more with this post. As I mentioned in the earlier post, scale has two dimensions: Queries and data streams. Given scenario may have lot of streams, lot of queries, complex queries, very large streams (event rate), or any combination those. Hence we have four parameters and the following table summarises some of useful cases.


Size of Stream
Number of Stream
Size of Queries
Number of Queries
How to handle?
Small Small Small Small 1 CEP or 2 for HA.
Large Small Small Small Stream needs to be partitioned
Small Large Small Large Front routing layers and back end processing layers. Run N copies of queries as needed
Large X X X Stream needs to be partitioned
X X Large X Functional decomposition + Pipeline or a combination of both


Do you need to scale?


WSO2 CEP can handle about 100k-300k events/sec. That is about 26 Billion events per day.  For example, if you are a Telecom provider, and if you have a customer base of 1B (Billion) users (whole world has only 6B), then each customer has to take 26 calls per day. 
So there isn't that many use cases that need more than this event rate. Some of the positive examples would be monitoring all emails in the world, some serious IP traffic monitoring, and having 100M Internet of things (IoT) devices that sends an event once every second etc. 
Lets assume we have a real case that needs scale. Then it will fall into one of the following classes. (These are more refined versions of categorised I discussed in the earlier post)
  1. Large numbers of small queries and small streams 
  2. Large Streams 
  3. Complex Queries

Large number of small queries and small streams



As shown by the picture, we need to place queries (may be multiple copies) distributed across many machines, and then place a routing layer that directs events to those machines having queries that need those events. That routing layer can be a set of CEP nodes. We can also implement that using a pub/sub infrastructure like Kafka. This model works well and scales.

Large Streams (high event rate)


As shown in the picture, we need a way to partition large streams such that the processing can run independently within each partition. This is just like MapReduce model which needs you to figure out a way to partition the data (this tutorial explains MapReduce details). 
To support this sceanrio, Siddhi language let you define partitions. A sample query would looks like following. 

define partition on Palyer.sid{
    from Player#window(30s)select avg(v)as v insert into AvgSpeedByPlayer;
}

Queries defined within the partitions will be executed separately.  We did something similar for the first scenario of the DEBS 2014 Grand challenge solution. From the next WSO2 CEP 4.0.0  release onwards, WSO2 CEP can run different partitions on different nodes. (With WSO2 CEP 3.0, you need to do this manually via a routing layer.) If we cannot partition the data, then we need a magic solution as described in next section.

Large/Complex Queries (does not fit within one node)


Best example of a complex query is the second scenario of the DEBS 2014 Grand Challenge that includes finding median over 24 hour window that involves about 700 million events within the window! 
Best chance to solve this class of problems is to setup a pipeline as I explained in my earlier post. If that does not work, we need decompose the query into many small sub queries. Talking to parallel programming expert (serious MPI guy might help you, although domains are different, same ideas work here.) This is the domain of experts and very smart solutions.
Most elegant answers comes in the form of Distributed Operators (e.g. Distributed Joins see http://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/). There are lot of papers on SIGMOD and VLDB describing algorithms for some of the use cases. But they work on some specific cases only. We will eventually implement some of those, but not in this year. Given a problem, often there is a way to distribute the CEP processing, but frameworks would not help you.
If you want to do #1 and #2 with WSO2 CEP 3.0, you need to set it up yourself. It is not very hard.  (If you want to do it, drop me a note if you need details). However, WSO2 CEP 4.0 that will come out in 2014 Q4 will let you define those scenarios using the Siddhi Query Language with annotations on how many resources (nodes) to use. Then WSO2 CEP will create queries, deploy them on top a Storm cluster that runs a Siddhi engine on each of it's bolt, and run it automatically.
Hopefully, this post clarifies the picture. If you have any thoughts or need clarification, please drop us a note.