Wednesday, February 25, 2015

WSO2 Demo Videos from O'reilly Strata 2015 Booth

We just came back from O’reilly Strata. It was great to see most of the Big Data world gathered at a one place. 

WSO2 have had a booth, and following are demos we showed in the booth. 

Demo 1: Realtime Analytics for a Football Game played with Sensors

This is shows a realtime analytics done using a dataset created by playing football game with sensors in the ball and the boots of the player. You can find more information from the earlier post.  

Demo 2: GIS Queries using Public Transport for London Data Feeds 

TFL (Transport for London) provides several public data feeds about London public transport. We used those feeds within WSO2 CEP's Geo Dashboard to implement "Speed Alerts", "Proximity Alerts", and Geo Fencing.

Please see this slide deck for more information. 

Introduction to Large Scale Data Analysis with WSO2 Analytics Platform

Slide deck for the talk I did at Indiana University, Bloomington. It walks though WSO2 Big data offering providing example queries.

Tuesday, February 24, 2015

Why We need SQL like Query Language for Realtime Streaming Analytics?

I was at O'reilly Strata in last week and certainly interest for realtime analytics was at it’s top.

Realtime analytics, or what people call Realtime Analytics, has two favours.  
  1. Realtime Streaming Analytics ( static queries given once that do not change, they process data as they come in without storing. CEP, Apache Strom, Apache Samza etc., are examples of this. 
  2. Realtime Interactive/Ad-hoc Analytics (user issue ad-hoc dynamic queries and system responds). Druid, SAP Hana, VolotDB, MemSQL, Apache Drill are examples of this. 
In this post, I am focusing on Realtime Streaming Analytics. (Ad-hoc analytics uses a SQL like query language anyway.)

Still when thinking about Realtime Analytics, people think only counting usecases. However, that is the tip of the iceberg. Due to the time dimension of the data inherent in realtime usecases, there are lot more you can do. Lets us look at few common patterns. 
  1. Simple counting (e.g. failure count)
  2. Counting with Windows ( e.g. failure count every hour)
  3. Preprocessing: filtering, transformations (e.g. data cleanup)
  4. Alerts , thresholds (e.g. Alarm on high temperature)
  5. Data Correlation, Detect missing events, detecting erroneous data (e.g. detecting failed sensors)
  6. Joining event streams (e.g. detect a hit on soccer ball)
  7. Merge with data in a database, collect, update data conditionally
  8. Detecting Event Sequence Patterns (e.g. small transaction followed by large transaction)
  9. Tracking - follow some related entity’s state in space, time etc. (e.g. location of airline baggage, vehicle, tracking wild life)
  10. Detect trends – Rise, turn, fall, Outliers, Complex trends like triple bottom etc., (e.g. algorithmic trading, SLA, load balancing)
  11. Learning a Model (e.g. Predictive maintenance)
  12. Predicting next value and corrective actions (e.g. automated car)

Why we need SQL like query language for Realtime Streaming  Analytics?

Each of above has come up in use cases, and we have implemented them using SQL like CEP query languages. Knowing the internal of implementing the CEP core concepts like sliding windows, temporal query patterns, I do not think every Streaming use case developer should rewrite those. Algorithms are not trivial, and those are very hard to get right! 

Instead, we need higher levels of abstractions. We should implement those once and for all, and reuse them. Best lesson we can learn from Hive and Hadoop, which does exactly that for batch analytics. I have explained Big Data with Hive many time, most gets it right away. Hive has become the major programming API most Big Data use cases.

Following is list of reasons for SQL like query language. 
  1. Realtime analytics are hard. Every developer do not want to hand implement sliding windows and temporal event patterns, etc.  
  2. Easy to follow and learn for people who knows SQL, which is pretty much everybody 
  3. SQL like languages are Expressive, short, sweet and fast!!
  4. SQL like languages define core operations that covers 90% of problems
  5. They experts dig in when they like!
  6. Realtime analytics Runtimes can better optimize the executions with SQL like model. Most optimisations are already studied, and there is lot you can just borrow from database optimisations. 
Finally what are such languages? There are lot defined in world of Complex Event processing (e.g. WSO2 Siddhi, Esper, Tibco StreamBase,IBM Infoshpere Streams etc. SQL stream has fully ANSI SQL comment version of it. Last week I did a talk on Strata discussing this problem in detail and how CEP could match the bill. You could find the slide deck from below.

Tuesday, December 16, 2014

ICTER 2014 Invited Talk: Large Scale Data Processing in the Real World: from Analytics to Predictions

Last week I did an invited talk at ICTER 2015 conference in colombo, discussing "Big Data", and following are the slides.



Large scale data processing analyses and makes sense of large amounts of data. Although the field itself is not new, it is finding many usecases under the theme "Bigdata" where Google itself, IBM Watson, and Google's Driverless car are some of success stories. Spanning many fields, Large scale data processing brings together technologies like Distributed Systems, Machine Learning, Statistics, and Internet of Things together. It is a multi-billion-dollar industry including use cases like targeted advertising, fraud detection, product recommendations, and market surveys. With new technologies like Internet of Things (IoT), these use cases are expanding to scenarios like Smart Cities, Smart health, and Smart Agriculture. Some usecases like Urban Planning can be slow, which is done in batch mode, while others like stock markets need results within Milliseconds, which are done in streaming fashion. There are different technologies for each case: MapReduce for batch processing and Complex Event Processing and Stream Processing for real-time usecases. Furthermore, the type of analysis range from basic statistics like mean to complicated prediction models based on machine Learning. In this talk, we will discuss data processing landscape: concepts, usecases, technologies and open questions while drawing examples from real world scenarios.

Tuesday, December 2, 2014

Short Introduction to Realtime Analytics with Big Data: What, Why, How?

What and Why of Realtime Analytics?

I am sure you have heard enough about Big Data, the idea of “processing data and extracting actionable insights from data”. Most Big Data applications use batch processing technologies like Hadoop or Spark, which will need us to store data in a disk and later process them.
Batch processing often takes few minutes to generate an output, and with large datasets, it can take hours. However, there are lot of use cases where it is much useful to know results faster.

For example, think about traffic data collected from counting vehicles at each traffic light. We can use Hadoop or Spark to analyze this data. Among useful insights can be “traffic hotspots”, “traffic trends over time” etc. It is interesting to know after a one hour, there was traffic in “US-101”. On the other hand, It is much more useful to know there is traffic now, so one could avoid it.

There are lot and lot of use cases like this where outcome is important and there is a chance that we can act to fix if there is a problem. Following are few of them.
  1. Algorithmic Trading
  2. Smart Patient Care
  3. Monitoring a production line
  4. Supply chain optimisations
  5. Intrusion, Surveillance and Fraud Detection
  6. Most Smart Device Applications : Smart Car, Home ..  
  7. Smart Grid
  8. Vehicle and Wildlife tracking
  9. Sport analytics
  10. Context aware promotions and advertising

Realtime analytics let you analyze data as they come in and make important decisions within milliseconds to few seconds.

How to do Realtime Analytics?

OK, great how can we do real time analytics?

Lets start with an example. Let us say you want to know how many visitors and in your site and be notified if there are more than 10000 visitors came in within last 30 minutes. However, you want to know that the condition has met right away.

Whenever visitor do something in your site, it sends events that looks like following. Ignore the syntax for now, but read what it means.

define stream SiteVistors(ts long, email string, url string)

Try 1: People first tried to do this by optimizing Hadoop having lot of processing nodes. With lot of machine and tuning you can bring down Hadoop job execution time to few seconds. This, however, is like trying to do your water supply using buckets instead of pipes. Chances are that it will break when 1) you change your query a bit, 2) when data has grown, or 3) when two batch jobs run at the same time. Also not to mention you will be using about 10X hardware than you will need.

For example, to do our use case, we would need to run Hadoop on last 30 minutes of data to count the number of visits, and likely we will have to run it back to back  starting another run once a run has completed.

Try 2: Google had the same problem, and they solved it with Dremel (which later made available as “Big Query”). Dremel let you issue queries over a large set of data and get responses within few seconds by breaking up and processing data using several machines in parallel. If you want this technology, Apache Drill is an opensource implementation of the idea.

Try 3: You can do this faster via In-Memory computing. Idea is to have lot of memory, load pr keep all the data to memory (do compressions and cool algorithms like Sketching when possible), and process the data. Since data is in memory, it will be much faster. For more information, please checkout a white paper and the slide deck I have done about the topic.

However, all above three are ways to make batch processing faster. Batch processing would just collect data for a period of time, and only try to process data when all the data has been received. Basically, we sit idle for first 30 minutes just collecting data and try to do it as fast as possible when 30 minutes has passed.

From that perspective, it is a bad idea to use batch processing to do this. There is much better way to do this. Idea is to process data as they come in, and that way once we have all the data, we can produce the results right away.


Such technology (called Stream Processing) has been around for more than 10 years and used in use cases like Stock trading. Main idea is to create a graph of processing nodes (each can be stateful) and data get processed as they flow through the graph. (e.g. IBM InfoStreams, Tibco Stream Base).

Fast forward to now, we have two classes of technologies to do this now: Stream Processing (e.g. Apache Storm) and Complex Event Processing (e.g. WSO2 CEP, Esper).

Think of Apache Storm as Hadoop for Streaming data. Idea is you write code for processing nodes called Bolts and wire them up to a graph called topology. Storm will keep this topology running. In our example, several processing nodes will track sum of visits for a given window of time and one master node can receive sums from other nodes and check sum of those sums for a condition ( for more info about code, see Word Count with Storm).


Great, then why Complex Event Processing? It is best explained through an analogy. You may have heard about Hive, which is a SQL on top of Hadoop (MapReduce). With Hadoop, you can write java code and get something done, but with Hive you can write an SQL query to get the most of the same things done. Latter is simpler and lot of people understand SQL.

Think of Complex Event Processing (CEP) as SQL on top of Storm. (Well technically there are deep differences, if you want to get to it, see 1 and 2). However, with over the time both technologies has shared more and more features. If you are a programmer, CEP would look like SQL on top of Storm. For example, see SQLStream samples and also WSO2 CEP 4.0 version, which will run your CEP query on top of Storm.)

For example our example on top of CEP will be look like following.

from SiteVistors#window.timeBatch[30m]
select email, sum(url) as sum
having sum > 10000

Here #window.timeBatch[30m] says collect data in 30 minute window and process the query. If you want processing to be done in parallel with many machines, the query will look like following.

//define the partition
define partition SiteVistorsParition;

//process data within partition
from ParitionedSiteVistors#window.timeBatch[30m]
select email, sum(url) as sum
insert into SiteVistorsSums;
using partition SiteVistorsParition;

//sum up the sums and check
from SiteVistorsSums#window.timeBatch[1s]
select sum(sum) as fsum
having fsum > 10000

Just like Hive, CEP technologies has lot of operators that you can directly use like Filters, Joins, Windows, and Event Patterns. See my earlier post for more details.


So we discussed what is Realtime analytics, Why we need it, and How to do it. Real Time analytics has lot of use cases that are very hard to implement with MapReduce style batch processing, and trying to make such use cases faster using MapReduce often eat up resources.

Having said that technologies like Apache Drill and Isolutions like SAP Hana have their own use case, which is interactive ad-hoc analytics. For Stream processing to work, you must know queries a priori.  If you want to do ad-hoc queries, you need to use technologies like Apache Drill. So there are three types of use cases and you need use different solutions for each.

  1. Batch Processing - MapReduce, Sprak
  2. Real time analytics when queries are known a priori - Stream Processing
  3. Interactive Ad-hoc queries - Apache Drill, Hazecast, SAP Hana

Following Picture Summaries different tools and requirements.

Y axis is amount of data (in size or as number of events), and X axis is time taken to produce the results. It outlines when each technology is useful.

Thursday, July 24, 2014

Handling Large Scale CEP Usecase with WSO2 CEP

I have been explaining the topic too many times in last few days and decided to write this down. I had written down my thoughts on the topic earlier on the post How to scale Complex Event Processing? This posts covers how to do those on WSO2 CEP and what will be added in upcoming WSO2 CEP 4.0 release. 
Also I will refine the classification also a bit more with this post. As I mentioned in the earlier post, scale has two dimensions: Queries and data streams. Given scenario may have lot of streams, lot of queries, complex queries, very large streams (event rate), or any combination those. Hence we have four parameters and the following table summarises some of useful cases.

Size of Stream
Number of Stream
Size of Queries
Number of Queries
How to handle?
Small Small Small Small 1 CEP or 2 for HA.
Large Small Small Small Stream needs to be partitioned
Small Large Small Large Front routing layers and back end processing layers. Run N copies of queries as needed
Large X X X Stream needs to be partitioned
X X Large X Functional decomposition + Pipeline or a combination of both

Do you need to scale?

WSO2 CEP can handle about 100k-300k events/sec. That is about 26 Billion events per day.  For example, if you are a Telecom provider, and if you have a customer base of 1B (Billion) users (whole world has only 6B), then each customer has to take 26 calls per day. 
So there isn't that many use cases that need more than this event rate. Some of the positive examples would be monitoring all emails in the world, some serious IP traffic monitoring, and having 100M Internet of things (IoT) devices that sends an event once every second etc. 
Lets assume we have a real case that needs scale. Then it will fall into one of the following classes. (These are more refined versions of categorised I discussed in the earlier post)
  1. Large numbers of small queries and small streams 
  2. Large Streams 
  3. Complex Queries

Large number of small queries and small streams

As shown by the picture, we need to place queries (may be multiple copies) distributed across many machines, and then place a routing layer that directs events to those machines having queries that need those events. That routing layer can be a set of CEP nodes. We can also implement that using a pub/sub infrastructure like Kafka. This model works well and scales.

Large Streams (high event rate)

As shown in the picture, we need a way to partition large streams such that the processing can run independently within each partition. This is just like MapReduce model which needs you to figure out a way to partition the data (this tutorial explains MapReduce details). 
To support this sceanrio, Siddhi language let you define partitions. A sample query would looks like following. 

define partition on Palyer.sid{
    from Player#window(30s)select avg(v)as v insert into AvgSpeedByPlayer;

Queries defined within the partitions will be executed separately.  We did something similar for the first scenario of the DEBS 2014 Grand challenge solution. From the next WSO2 CEP 4.0.0  release onwards, WSO2 CEP can run different partitions on different nodes. (With WSO2 CEP 3.0, you need to do this manually via a routing layer.) If we cannot partition the data, then we need a magic solution as described in next section.

Large/Complex Queries (does not fit within one node)

Best example of a complex query is the second scenario of the DEBS 2014 Grand Challenge that includes finding median over 24 hour window that involves about 700 million events within the window! 
Best chance to solve this class of problems is to setup a pipeline as I explained in my earlier post. If that does not work, we need decompose the query into many small sub queries. Talking to parallel programming expert (serious MPI guy might help you, although domains are different, same ideas work here.) This is the domain of experts and very smart solutions.
Most elegant answers comes in the form of Distributed Operators (e.g. Distributed Joins see There are lot of papers on SIGMOD and VLDB describing algorithms for some of the use cases. But they work on some specific cases only. We will eventually implement some of those, but not in this year. Given a problem, often there is a way to distribute the CEP processing, but frameworks would not help you.
If you want to do #1 and #2 with WSO2 CEP 3.0, you need to set it up yourself. It is not very hard.  (If you want to do it, drop me a note if you need details). However, WSO2 CEP 4.0 that will come out in 2014 Q4 will let you define those scenarios using the Siddhi Query Language with annotations on how many resources (nodes) to use. Then WSO2 CEP will create queries, deploy them on top a Storm cluster that runs a Siddhi engine on each of it's bolt, and run it automatically.
Hopefully, this post clarifies the picture. If you have any thoughts or need clarification, please drop us a note.

Friday, June 20, 2014

Glimpse of the future: Overlaying realtime analytics on Football Broadcasts

At WSO2con Europe (, which concluded Wednesday, we did a WSO2 CEP Demo, which as very well received.  We used events generated from a real football game, calculated a bunch of analytics using WSO2 CEP (, annotated the game with information, and run it side by side with the real game’s video.

The original dataset and video was provided as part of 2013 DEBS grand challenge by ACM Distributed Event based Systems conference. 

Each payer had sensors in his shoes, goalie had two more in his gloves, and ball also had a sensor. Each sensor emits events in 60Hz where a event had sensorID, time stamp, x,y,z locations, velocity and acceleration vectors.

The left hand panel visualizes the game on 2D in sync with game running on right hand side and other panels show analytics like, successful vs. failed passes, ball possession, shots on goal, running speed of players, etc. Furthermore, we wrote queries to detect offsides and annotate them on 2D panel.  Slide deck at says how we (Dilini, Mifan,  Suho, myself and others) did this.

I will write more details soon, and if you want to know more or get the code, please let me know.

Now we have technology to augment your sport viewing experience. In few years, how we watch a game will be much different.