What is CEP?
Complex event processing (CEP) systems query events on the fly without storing them.
- For
an introduction and definition of CEP, please refer to CEP Blog
and Wikipedia.)
.
- If you need a real comprehensive coverage of CEP, read the paper "Processing
flows of information: from Data stream to Complex Event Processing" [1]. (or the
slide deck).
In CEP, we think in terms of event streams. Event stream is a logical sequence of events that become
available over time. For example, stock event steam consists of events that
notify changes to stock price. Users provide
queries to the CEP engine, which implements the CEP logic, and the CEP engine
matches those queries against events coming through event streams.
CEP differs from other paradigms like event processing,
filtering etc., by its support for temporal queries that reason in terms of
temporal concepts like "time windows" and "before and after relationships" etc. For
example, a typical CEP query will say that
“If IBM stock value increased by
more than 10% within an hour, please notify me".
Such a CEP query has few
defining characteristics.
- CEP queries generally keep running, and keep
emitting events when events match the condition given in the query.
- CEP query operates on the fly and stores only minimal
amount of events into a storage.
- CEP Engines responds to conditions generally
within milliseconds range.
What is Scaling CEP?
There are many CEP Engine implementations (see
CEP Players list 2012).
However, mostly CEP engines run in a large box, scaling up horizontally.
Vertically scaling CEP engines is still an open problem. Reminder of this post
discusses what vertically scaling CEP engine means and some of the potential
answers.
We use the term Scaling to describe the ability for a CEP system to hande larger or complex queries by
adding more resources. Scaling CEP has several dimensions.
- Handling Large number of queries
- Queries that needs large working memory
- Handling a complex query that might not fit
within a single machine
- Handling large number of events
How to Scale CEP?
Let us consider each dimension separately.
Handling Large Number of Queries
This is the easiest of the four since we can use the shared
nothing architecture. Here we can run multiple CEP Engine instances (each
instance runs in a single host) each running a subset of queries.
- Trivial implementation will send all events
(streams) to all the CEP engines
- More optimized implementations can use a
Publish/subscribe message broker network (like Narada Broker). Here each CEP
engine should analyze the deployed queries and subscribe to required event
streams in the broker network. Generally, we match each event stream to a topic
in the publish/subscribe system.
- Third option is to delegate the event
distribution to a Stream Processing system (e.g. Apache S4 or Strom). For
instance, links [4] and [5] describe such a
scenario to run Esper within Strom.
Queries that need large working memory
For instance, a long running complex query that needs to
maintain a large window and all events in the window would need a large working
memory. Potential answer to this problem is to use a distributed cache to store
the working memory. Reference
[6] describes such a scenario.
Handling large number of events handling a complex
query that might not fit within a single machine
We will handle the both scenarios together as both are two sides of the same coin. In both cases, we have trouble fitting a single query into a single host such that it can support the given event rate.
To handle such scenarios, we have to distribute the query across many computers.
We can do this by breaking the query into several steps in a pipeline that matches events against some conditions and republish the matching events to steps further in the pipeline. Then we can deploy different steps of the pipeline into different machines.
For example, lets consider the following query. This query matches if there are two events
within 30 seconds from IBM stocks that having price greater than 70 and having prize
increase more than 10%.
select a1.symbol, a1.prize, a2.prize from every
a1=StockStream[price > 70 symbol =’IBM’] ->
a2=StockStream[price > 70 symbol =’IBM’]
[a1.price < 1.1*a2.prize][within.time=30]
As shown by the figure, we can break the query into three
nodes, and each node will have to republish the matching events to the next
node. (Option 1)
|
CEP Query as a Pipeline |
However, queries often have other properties that allow
further optimization. For example, although the last step of matching prize
increase is stateful other two steps are stateless. Stateful operations
remember information after processing an event so that earlier events affect
the processing of later events while stateless operations only depends on the
event being processed.
Therefore, we can add multiple instances in the place of
those statless instances using a shared-nothing architecture. For example, we
can break the query into five nodes as shown by the bottom part of the picture (Option 2).
Also another favorable fact is that CEP processing generally
happens through filtering where amount of events reduce as we progress through
the pipeline. Therefore, pushing stateless filter like operations (e.g. matching against symbol ="IBM") to the first
parts of the pipeline and scaling them in shared nothing manner should allow us
to scale up the system for much higher event rates. For example, lets say that
the StockQuote event stream generates 100,000 events per seconds, but only 5%
of them are about IBM. Therefore, only 5000 events will make it past the first
filter, which we can handle much easier than 100k events.
However, it is worth noting that above method only works
with some queries. For example, if we have a query that has a single stateful
operation like window-based pattern, we cannot use this method.
Unfortunately, there is no framework that can do this out of
the box (let me know if I am wrong). So if you want to do this, you will have
to code it. If you choose to do that, using a pub/sub network or stream
processing system might reduce most of the complexities.
Please shared your thoughts!
- Alessandro Margara and Gianpaolo Cugola. 2011. Processing
flows of information: from data stream to complex event processing. InProceedings
of the 5th ACM international conference on Distributed event-based system(DEBS
'11).
- http://www.thetibcoblog.com/2009/08/21/cep-versus-esp-an-essay-or-maybe-a-rant/
- http://www.slideshare.net/TimBassCEP/mythbusters-event-stream-processing-v-complex-event-processing-presentation
- Run Esper with
Storm -http://stackoverflow.com/questions/9164785/how-to-scale-out-with-esper
- http://tomdzk.wordpress.com/2011/09/28/storm-esper/
- Distributed Cache to scale CEP -http://magmasystems.blogspot.com/2008/02/cep-engines-and-object-caches.html