Wednesday, December 18, 2013

Should I be Tuning GC?

Java VM now do a pretty good job picking a GC algorithm. Often any tuning we do make things worse. I have seen several times that all you have to do to fix a issue is take take away GC tuning parameters that are there. (Unfortunately often after weeks of debugging)

How do you decide to GC or not to GC?

To measure GC, we use a measure called JVM throughput. JVM throughput measures the percentage of time it did not spent on GC (in other words, time it spending doing useful work). If JVM throughput is greater than 90%, you should NOT be tuning GC. If it is less, then may be you should.

OK, then how do you measure JVM throughput?
  1. Run you program with following GC parameters.This will write the GC logs to gc.log file of the home directory
  2. -Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
  3. To analyse, download and unzip GC Viewer from
    Start the GC viewer by running following command from the unzipped directory.
    $java -jar gcviewer-1.32.jar
  4. Then open the gc.log file and it shows the throughput.

Also you can use IBM pattern modelling tool from alpha works.

Monday, October 7, 2013

Common usage patterns for WSO2 ESB Mediators

Few weeks back, I spent a week digging into WSO2 ESB in detail. When you get started, it is bit tricky to figure out what parts should be used where and when. I thought it is useful to write down some of the common patterns/ best practices on using WSO2 ESB Mediators.


As shown by the diagram, ESB sits between the services and clients and provide mediation. Mediation provides one or more of the following functions.

1. Message conversion and Transport switching
2. Routing
3. Filtering (e.g. security termination)
4. Service Chaining & Aggregation

Let us look at each function and discuss some of the constructs from WSO2 ESB that you can use to implement them. WSO2 ESB provides an operator called mediator and most functionality is implemented as a Mediator.  When describing how you can get ESB to do something, I will just name the mediator at appropriate places. You can find more information about how to use the mediator from WSO2 ESB documentation (

You first start by receiving a message. You can receive a message via a Proxy Service, an API, or using a Main Sequence. After receiving the message you can use mediators to change or reroute messages as needed.

Message conversion and Transport switching 

When you do message conversion, if you are changing only small part of the message, then you can use the enrich mediator, However, if you mostly rebuilding the message from ground up, you are better off with the payload factory mediator that let you type in the message as a template and insert values at the right place. Finally if you are doing complex transformations like expanding a list in the source message, then you should use XSLT mediator. You may also use FastXSLT Mediator, Smooks Mediator, XQuery Mediator when applicable.

WSO2 ESB can receive and send messages via many transports like HTTP, SOAP, JMS, FTP, Email, FIX etc. You can switch a message from one to the other transport by configuring transports and using Send mediator with the correct transport.


Filtering stops some of the messages from proceeding further. You can use Filter Mediator, Throttle Mediator or security mediators like Entitlement Mediator, Validate Mediator, and OAuth Mediator.


Routing can change the path of the message based on runtime conditions.

Sometime this is done via endpoints like load balancing endpoints or fault tolerant endpoints to achieve load balancing or fault tolerance. You can also use Conditional Router Mediator, Event Mediator, Router Mediator, Switch Mediator, and URLRewrite Mediator. It is worth noting that if the routing send the message out more than once, you need to use clone mediator to make a copy before calling send mediator.

Service Chaining 

Service chaining let you add information retrieved from other sources like Web services databases etc., with the current service call. For this you can often use service-chaining pattern. There are two ways to implement service chaining. Out of those, it is recommended to use “receive” property in the send mediator to receive the service response for service chaining.

You can further use Callout Mediator, DBLookup Mediator, DBReport Mediator and EJB Mediator.

Sometime we need do calls with multiple threads and collect results of those calls. For that you can use Aggregate Mediator, Clone Mediator, and Iterate Mediator. Here Clone mediator is used to create another execution path for the message (e.g. Thread). Iterate Mediator is used to repeatedly do the same task using different inputs. You can use aggregate mediator to collect results from both Clone and iterate mediators.

There are other mediators like Cache Mediator, Drop Mediator, Fault Mediator, Log Mediator etc, and use can find more information from the WSO2 ESB documentation.


Friday, September 13, 2013

My Upcoming WSO2Con Talk about Bigdata and WSO2 Platform

At upcoming WSO2 Con US in October at San Francisco, I will be talking about WSO2 middleware offering in BigData and explain how you can put them together to build a solution that will make sense of your data, touching on technologies for collecting, storing, and analysing data.

You will hear about WSO2 Products like BAM, CEP, Storage server and how you can combine them to build solutions that can collect events at rate of few 100k events per second and how to process those data both offline as well as realtime.

Title of the talk is "View, Act, and React: Shaping Business Activity with Analytics, BigData Queries, and Complex Event Processing", and you can find more information from You can find some of my earlier talks at

Wednesday, August 7, 2013

Understanding Complex Event Processing (CEP) Operators with WSO2 CEP (Siddhi)

CEP model have many sensors. A sensor can be a real sensor (e.g. temperature sensor), some agent, or a system that support instrumentation. Sensor sends events to CEP and each event has several name value properties.

We call events coming from the same sensor as a “stream” and give it a name. When an interesting event occurs, the sensor sends that event to the stream.

To use a stream, you need to first define them.

 define stream PizzaOrders (id string, price float, ts long, custid string)

CEP listens to one or more streams, and we can write queries telling the CEP to look for certain conditions. For writing queries, you can use following constructs.
  1. Filters 
  2. Windows 
  3. Joins 
  4. Patterns and Sequences 
  5. Event tables 
  6. Partitions 
Let us see what we can do with each construct.

Filter checks a condition about property in an event. It can be a =, >, < etc., and you can create complex queries by combing multiple conditions via and, or, not etc.

Following query detect pizza orders that are small and placed too far from the store.

 select from PizzaOrders[price <= 20 and distance>1km]  
 insert into NBNOrders id, price, distance  


An event stream can have an infinite number of events. Windows are a way to select a subset events for further processing. You can select events in many ways: events came in a time period, last N events etc.

Output from a window is set of events. You can use it for further processing (e.g. joining event streams) or calculate aggregate function like sum and average.

We can either get output to be triggered when all events are collected or whenever a new event is added. We call the first type batch windows and second sliding windows.

For example, window can collect all pizza orders placed in the last hour and emit the average value of the order once every hour.

 from PizzaOrders#window.time( 1h ) into HourlyOrderStats avg(price) as avgPrice  

Join operator join two event streams. Idea is to match event coming from two streams and create a new event stream.

For example, you can use join operator to join PizzaDelivery stream and PizzaOrder stream and calculate the time took to deliver each order.

 from PizzaOrder#window.time(1h) as o join PizzaDelivery as d  
   on ==  
 insert into DeliveryTime as id, d.ts-0.ts as ts  

At least one side of the join must have a window. For example, in above example, we can have a one hour window for PizzaOrder (because delivery always happens after the order) where join will store the events coming in PizzaOrder for one hour and match them against delivery events. If you have two windows, join will store events at each stream and match them against events coming to the other stream.

Patterns and Sequences 

Patterns and sequences let us match conditions that happen over time.

For example, we can use patterns to identify returning customers using following query. Here -> denotes followed by relationship.

 from every a1 = PizzaOder  
     -> a2 = PizzaOder[custid=a1.custid]  
 insert into ReturningCustomers  
    a1.custid as custid a2.ts as ts  

Patterns match even when there are other events in between two matching conditions. Sequences are similar, but provided event sequence must exactly match the events that happened. For example, following is the same query implemented using sequences. Note here second line is to ignore any not matching events.

 from every a1 = PizzaOder,  
      a2 = PizzaOder[custid=a1.custid]  
 insert into ReturningCustomers  
    a1.custid as custid a2.ts as ts  

Here instead of -> relationship we use regular expression like notation to define sequence of conditions.

Partitions (available in upcoming 3.0 release)

Siddhi evaluates a query matching all the events in event streams used by that query. Partitions let us partition events into several groups based on some condition before evaluating queries.

For example, let say we need to find the time spent until pizza left shop and until it is delivered. We can first partition pizza orders by orderID and then evaluate the query. It simplifies the query by great extent.

define partition oderParition by, PizzaDone.oid, PizzaDelivered.oid   
select from PizzaOder as o ->PizzaDone as p -> PizzaDelivered as d 
insert into OrderTimes (p.ts-o.ts) as time2Preprae, (d.ts-p.ts) as time2Delivery 
   partition by oderParition  

We do this for several reasons.
  1. Evaluating events separately within several partitions might be faster than matching them all together. In the later case, we match events only within the partition.
  2. Sometime it makes queries easier to design. For example, in the above query, partitioning let us write a query without worrying about other orders that are overlapped with the same order.
  3. Partitions let CEP runtime to distribute evaluation to multiple machines, and this can helps when scaling queries.
Event Tables (available in upcoming 3.0 release)

Event tables let you remember some events and use them later. You can define a table just like a stream. 

 define table LatePizzaOrdersTable (ordered string, ts long, price float);

Then you can add events to it, delete events from it, and join those events in the table against incoming events.

For example, lets say we need to store all late deliveries and if late delivery happend to the same customer twice we want to give them free pizza.

 from LatePizzaDeliveries insert into LatePizzaOrdersTable; 

Then we can join events from event table with incoming events as follows.

from LatePizzaDeliveries as l join LatePizzaOrdersTable as t 
    on l.custid=t.custid AND l.ts!=t.ts
insert into FreePizzaOrders
You can also do the same using an event stream. However, event tables can be written to the disk and very useful for the long running usecases. For example, if we do the above using an event stream stored values will be lost when we restart the server. However, values in event tables will be preserved in a disk.

Friday, August 2, 2013

CEP Performance: Processing 100k to Millions of events per second using WSO2 Complex Event Processing (CEP) Server

With WSO2 CEP, you can use SQL style queries to detect interesting patterns across many data streams. We call the standalone version of the CEP as Siddhi, and that is what you need to use if you need to embed CEP engine within a java program. On the other hand, WSO2 CEP provides CEP query support running as a server, and you can send events using Thrift, Web Services, REST calls, JMS, and emails.

WSO2 CEP can handle few 100k events over the network and few million events within the JVM. We had done and publish those numbers before. In this post, I will try to put all together and give context into different numbers.

In the following, event includes multiple properties and queries matches those events against given conditions. 

Same JVM Events performance

 Setup: We used Intel(R) Xeon(R) X3440 @2.53GHz , 4 cores 8M cache 8GB RAM running Debian 2.6.32-5-amd64 Kernel.  We genereted events from the same JVM.

Case 1: Simple filler (from StockTick[prize >6] return  symbol, prize)

Case 2:  Window (From StickTick[symbol=‘IBM’]#win.time(0.005)  return  symbol, avg(prize))

Case 3: Events patterns A->B (A followed by B). 

From f=FraudWarningEvent ->
return accountNumber;

Performance Over the Network

Setup: We used Intel® Core™ i7-2630QM CPU @ 2.00GHz, 8 cores, 8GB RAM running Ubnthu 12.04, 3.2.0-32-generic Kernel, for running CEP and used Intel® Core™ i3-2350M CPU @ 2.30GHz, 4 cores, 4GB RAM running Ubnthu 12.04, 3.2.0-32-generic Kernel, for the three client nodes.

Following results are for a simple filter, we sent events over the network using thrift. 


Performance For a Complex Scenario

Finally following is the performance for DEBS grand challenge. Grand challenge detect following scenarios from the event generated from a real football game.

Usecase 1: Running analysis

The first usecase measures each player’s running speeds and calculates how long he spent on different speed ranges. For example, results will tell that the player "Martin" is running fast from the time 27 minutes and 01 second of the game to 27 minute and 35 second of the game.

Usecase 2 & 4: Ball Possession and Shots on Goal

 For the second use case, we need to calculate the time each player controlled the ball (ball possession). A player controls the ball from the time he hit the ball until someone has hit the ball, ball goes out of the ground, or game is stopped. We identify hits when a ball is within one meter of a player and its acceleration increases by more than 55ms-2.

The usecase four is to detect hits and emit events if the ball is going to the goal.

Usecase 3: Heatmap of Activity

Usecase three divides the ground to a grid, and calculate the time a player spends on each cell. However, this usecase needs updates once every second. First part we can solve just like with the first usecase, but to make sure we get an event once a second, we had to couple it with a timer.

You can find more information from , and you can find the queries from this blog post

Setup: VM with 4 cores (@2.8 GHz), 4 GB RAM, SSD HDD, and 1GB Ethernet, and we replayed events from the same JVM. 


Thursday, June 6, 2013

Instant MapReduce Patterns book

I have been working on a micro book (50 pages) on Instant MapReduce Patterns, which is available now both online and in printed from. (You can also buy it from Amazon).

It is a short book, that explains 10 simple MapReduce application patterns. The book starts from the ground zero, introducing MapReduce. Then it goes on to MapReduce patterns.

The book aims to get reader beyond the word count (the hello world equivalent in the Map Reduce world) and give them a concrete feel into writing MapReduce programs. There are three introductory level recipes, and then it describes seven patterns of MapReduce programs: analytics, set operations, cross correlation, search, graph, Joins, and clustering with an example on each pattern.

Hope it will be useful, I will blog later about sample recipes.

Also if you are looking for a detailed treatment, our earlier book "Hadoop Mapreduce Cookbook" might worth a look. 

Thursday, May 30, 2013

Introduction to Big Data

Following is the introduction to Big Data slide deck I presented at Colombo Java meetup this month. You can find the slides from Introduction to Big Data

Here I am taking a detailed look at the analysis, looking beyond the basic analytics and basic figures. We do analysis for three reasons.
  1. To know what happened (Analytics)
  2. To  explain what happened (Models, theories)
  3. To predict what will happen (Neural networks, Numerical models )
Also the slides discuss much about existing predications like weather we are doing very well. It also includes a typical big data architecture and discusses different parts of that architecture. 

Finally, the slides goes on to discuss Hadoop and CEP and provide some of the examples.  

Monday, May 13, 2013

How to profile WSO2 Products

We use Profiling to find performance bottlenecks. We use tools like JProfiler and Yourkit profiler to profile java programs. However, if you try to use one of those tools to profile a WSO2 products, there are few things you should know.

I have repeated this many times, and that is the reason I am writing this down.
  1. Start the WSO2 Product
  2. Connect to the product by finding the Java processor from the menu. If you are debugging from a remote machine, you need to start the product with the JVM options (e.g. with jprofiler, the command look like -agentpath:/Applications/jprofiler7/bin/macos/libjprofilerti.jnilib=port=8849).
  3. When it connects, use instrumentation (you can sampling only if you run the test for 3-4 hours or more).
  4. Generally default filters will include org.apache.*, which will remove most useful information. To avoid that, remove all default filters, and add inclusive filters for org.wso2, org.apache, and java.util.
  5. Enable CPU or other useful data recording options, and continue with your profiling. 

Finally, if you have a memory leak, it is good idea to get a memory dump and to analyze it using EclipseMemoryAnalyzer and look at “Leak Suspect Report”.

Wednesday, May 1, 2013

Solving DEBS 2013 Grand Challenge with WSO2 CEP/Siddhi

ACM International Conference on Distributed Event-Based Systems (DEBS) is one of the primary venues for Complex Event processing related research. Every year, they provide a grand challenge that challenges the participants to solve an event-based problem.

This year we are also competing, and following are some of our experiences while doing it.

This year grand challenges is to process events generated in a football game and answer four queries related to the game in streaming fashion. Both the player’s shoes and ball had sensors, and as the game continues, it generates about 15,000 events per second. Each event includes the location (x,y,z), time stamp, velocity and acceleration of the sensor. You can find more information from Complete dataset is 49 million events.

There are four usecases. I will explain each and tell bit about how we implemented them. To understand the queries, you need to know the Siddhi Event query language. Following is a crash course.

from <name of the event stream1>[condition on event], 
    <name of the event stream2>[condition on event], 
    <name of the event stream3>[condition on event] …
select <values to extracted from matching events>
insert into <name of the event stream to send results>

Here the comma separated conditions define an event sequence. That is the query matches when those conditions have matched in the given order by the incoming events.

We worked with two streams: Players – events from players and BallStream – events from the ball.

Usecase 1: Running analysis 

The first usecase measures each player’s running speeds and calculates how long he spent on different speed ranges. For example, results will tell that the player "Martin" is running fast from the time 27 minutes and 01 second of the game to 27 minute and 35 second of the game.

We have implemented this usecase by using CEP event sequences to detect whenever a player crossed a threshold of event speeds using a query like following.

define partition player by Players .id
from s = Players [v <= 1 or v > 11] , 
    t = Players [v > 1 and v <= 11]+ , e = Players [v <= 1 or v > 11]
select s.ts as tsStart , e.ts as tsStop , as playerId , 
    ‘‘trot" as  intensity , t [0].v as   instantSpeed , 
    (e.ts - s.ts )/1000000000 as  unitPeriod
insert   into   RunningStats  partition by  player

Query define sequence of conditions. For example,  Players [v <= 1 or v > 11] means velocity range in events received in player event stream. Siddhi invokes the callback when those conditions are met.

Here “+” point to one or more occurrences similar to regular expressions. You can find more information about Siddhi language from Siddhi Language Specification. We wrote a query for each speed range, found the time stamps of speed range changes, and calculated the final result.

Here event partitions partition the events by some condition before processing it. Above query partition events by the player.

Usecase 2 & 4: Ball Possession and Shots on Goal 

 For the second use case, we need to calculate the time each player controlled the ball (ball possession). A player controls the ball from the time he hit the ball until someone has hit the ball, ball goes out of the ground, or game is stopped. We identify hits when a ball is within one meter of a player and its acceleration increases by more than 55ms-2.

The usecase four is to detect hits and emit events if the ball is going to the goal.

We implemented both using event sequences, and following are the queries. Here fuctions like debs:isGoingToGoal(..) and debs:getDistance(..) are java custom functions we wrote and registered with Siddhi.

 Following query detects the hits.

from Ball#window.length(1) as b join 
 Players\#window.length(1)  as p unidirectional
 on debs:getDistance(b.x,b.y,b.z,
  p.x, p.y, p.z) < 1000 and b.a > 55
select p.sid, p.ts, p.x, p.y, p.z, p.a,,p.tid, 
 b.sid as ballSid  
insert into hitStream

Following query detects the event streams while a player is in control of the ball. 

from old = hitCountStream, 
 b = hitCountStream[ != pid ], 
 n=hitCountStream[ == pid]*, 
 e1 = hitCountStream[ != pid ] 
  or e2=ballLeavingHitStream 
select as playerId, b.tid as teamId, 
 b.ts as startTs,  
 coalesce(e1.ts ,e2.ts) as endTs, 
 coalesce(e1.cnt ,e2.cnt) as counter1, 
 b.cnt as counter2 
insert into BallPossessionStream

following query detects hits at the goal.

from s = hitStream, 
 t = Ball[debs:getDistance(s.x,s.y,s.z, 
  x, y, z) < 1000]+, 
 e = Ball[debs:isGoingToGoal(s.tid,x,y,z
  ,a,v,vx,vy,vz,s.ts,ts) == 1.0]+, 
 h = Ball[debs:isGoingToGoal(s.tid,x,y,z,
  a,v,vx,vy,vz,s.ts,ts) == 0.0] 
   or l=ballLeavingStream
select s.ts as startTime, e[0].ts,, 
 s.tid, 'on' as mode "
insert into ShotOnGoalStream

Usecase 3: Heatmap of Activity 

Usecase three divides the ground to a grid, and calculate the time a player spends on each cell. However, this usecase needs updates once every second. First part we can solve just like with the first usecase, but to make sure we get an event once a second, we had to couple it with a timer.

The queries are given below, and you can find the explanation from the paper. Following query detects when a player has changed the cell.

from r=PlayersCell, 
 s = PlayersCell[cell!=r.cell] 
select as playerId, s.ts as time, s.x, s.y, 
 s.cell as newCell,r.cell as oldCell,  
 s.tsms as timems, 'ball' as type 
insert into r4CellChanges partition by player
Following query join the events with 1 second timer.
from timer1s unidirectional 
 join PlayersCell\#window.unique(pid) as c 
select as playerId, timer.ts as time, c.x, 
 c.y, c.cell as newCell, c.cell as oldCell,  
 timer.tsms as timems, 'timer' as type 
insert into r4CellChanges

Following query use results from above two queries and calculate the time spent on each cell. 

from s=region4cellChange, 
select s.playerId, e.timems as endTimems,
 e.time as ts, s.x, s.y, 
 e.time- s.time as time, 
 s.newCell as cell, e.type 
insert into r4CellStay partition by player

We were able to get about 50,000 events per second for usecase 3 and more than 100000 event per second with other usecases on a 4 core, 4G VMware virtual machine. This is about three times the required rate of 15000 events, which we believe to be impressive. Also this shows that Siddhi EQL can naturally fit such a complex scenario into queries, and they provide a nice higher level abstraction to think about the problem.

Hope this was interesting. Implementing the actual scenarios included more queries to calculate all the details,  but above presents the basic idea.

If you like to try out WSO2 CEP/ Siddhi you can find it from

You can find an introduction to Siddhi from 

Friday, February 22, 2013

What is messaging and why should you care?

I drew the following diagram for wso2con talk. Through the diagram, I am trying to explain “what is messaging” and “why should you care”? Here are some thoughts.

When we build systems, often we use some form of RPC: like UNIX RPC, RMI, Web Services and Thrift etc. Yes, each has its unique traits, yet fundamentally all let users invoke a procedure call living in a remote server and has three properties.
  1. Request/response (a.k.a. two way messaging)
  2. Synchronous (client wait for the server to return)
  3. Transient (message is not stored)
Although, we often take these properties as granted, there is a flip side to each. For example, there are one-way interactions, asynchronous interactions, and persistent interactions.

As shown by the figure, those properties can be combined into eight possibilities, and RPC is just one of the eight. We use messaging to mean a superset of RPC interactions that allow users the flexibility of choosing above properties. Messaging systems (e.g. JMS) support Distributed Queues (see [2]) and Publish/Subscribe (see [1]) models.

Messaging may provide all or some of the following advantages.
  1. Reliability – via transactions or persistence 
  2. Performance – enable us to do SEDA like event driven non-blocking architecture that can provide higher throughput than blocking architectures. 
  3. Loose coupling – participants joining the interaction may loosely coupled in one of the following three dimensions (read the paper “Many faces of Publish Subscribe” for more information. )
    • Space – server and client do not need to know each other.
    • Time – server and client do not have to be online at the same time.
    • Synchronization –client does not have to wait for the message to be delivered.
  1. Eugster, Patrick Th, et al. "The many faces of publish/subscribe." ACM Computing Surveys (CSUR) 35.2 (2003): 114-131.
  2. Scalable Persistent Message Brokering with WSO2 Message Broker slide deck

Wednesday, January 30, 2013

Our book, "Hadoop MapReduce Cookbook" is online

Myself and Thilina has been working on a Book on Hadoop, and it is now online! You can find the book from or from Amazon at It is available under both paperback as well as in the e-book format. 

Hadoop is an implementation of the MapReduce pattern first introduced by Goolge in their seminal paper MapReduce: Simplified Data Processing on Large Clusters . It provide a programming model for users to process large dataset using many computers. 

For example, let us consider there are few giga bytes of log files that contains access logs for a server. If you want to read those log files and count the number of hits received by each web page in the server. It is possible for the user to write a program that walks though the log file and process them. However, if the log files are large, users would need to process the log files using many computers. Writing a system that process such log files using many computers would be a significant undertaking. 

However, toolkits like Hadoop that support MapReduce framework would let users to write two functions called "map" and "reduce", and the framework will take care of the details of processing the log files. Furthermore, Hadoop will handle details like communication between nodes, scheduling sub-tasks, handling failures, and debugging. 

Log processing is only a trivial examples that can be implemented with MapReduce paradigm. It can and it is being used to implement many simple and complex data processing tasks around the world. Users can extend in many ways to handle different message formats. 

The book starts with simple introductory level details, but goes into many map reduce patterns like analytics, clustering, and recommendations etc. Each one is explained using a recipe and accompanied with code samples. We believe it has recipe that would help beginners as well as experienced MapReduce developers. 

Some important information. 
  1. Book uses Hadoop 1.0.x releases 
  2. Table of content is at (click on the image)
  3. The sample chapter that talks about Analytics using MapReduce from  

This finishes about a year long process of writing editing, and re-editing. It took lot of time, but it was of course a great experience. I would like to thank the editorial team for all the help and feedback. We hope the book will be be useful for Hadoop developers.