Wednesday, October 31, 2012

Do you need to match few 100k events per second? Siddhi Open source CEP Engine and WSO2 CEP Server 2.0 Released

I have written about Siddhi, a new open source java Complex Event Processing engine available under Apache License we were working on. At last, it has emerged, and we have out first production grade release of Siddhi.

Now Siddhi has become the runtime for WSO2 CEP server, and we have just released the WSO2 CEP 2.0 and Siddhi 2.0 comes bundled with it.

WSO2 CEP adds network support for Siddhi, and now you can use it with Thrift, HTTP, SOAP, and JMS.  Siddhi/WSO2 CEP 2.0 has several cool (new) features, but let me briefly list few things I believe are the key.

 1) Very fast (can do 1-2.5 Million events/ sec as a jar library)
 2) Fast across the network too (about 0.25 million event/sec over thrift calls)
 3) New Siddhi Query language (supports Filters, Windows, Pattern, Sequences, Joins). You can find the specification from http://docs.wso2.org/wiki/display/CEP200/Siddhi+Language+Specification
 4) Supports Hazzlecast based highly available deployments and large working memories
 5) Supports periodic snapshots of CEP state so it can recover the state in case of a failure
 6) Integrate closely with WSO2 BAM. Basically you can setup a system where you collect data from many event sources and send them to both BAM and CEP using the same thrift transport so that the former can do batch style hadoop/hive based processing and the latter can do real time processing.

 You can download the server from http://wso2.com/products/complex-event-processor.

 If you are looking for Siddhi jar to be used with your java code, it is in the .zip file. Alternatively you can get the jar from http://dist.wso2.org/maven2/org/wso2/siddhi/siddhi-distribution/1.0.0-wso2v2/.

 We have moved the code and mailing list to WSO2.

 You can find the code from https://svn.wso2.org/repos/wso2/carbon/platform/branches/4.0.0/dependencies/commons/siddhi/1.0.0-wso2v3/.

 You can find documents and samples from http://docs.wso2.org/wiki/display/CEP200/Complex+Event+Processor+Documentation.

 Any questions, you can ask at http://stackoverflow.com/questions/tagged/wso2/.

 I will soon write more about architecture and how some of the Siddhi/CEP features are implemented.













Wednesday, October 3, 2012

Offline Profiling with JProfiler


We use offline profiling when we want to run the profiling in the headless mode (without UI). Another option is to profile by connecting remotely using JProfiler GUI, which is great.  However, sometime even that option is not available if the network is slow or only allow SSH connection.

Following are quick instructions for offline profiling. For more details, refer to the "JProfiler Manual".
  1. Create a JProfile session and configure the triggers. Make sure you add a "save snapshot trigger with a timer", else you will not get anything. Do this like every 5-10 minutes. You can also give the target snapshot file location when you configure. 
  2. Copy local .jprofiler7/config.xml and JProfiler binaries or agent code to the remote machine 
  3. Add following before the java command.
    -agentpath:JPROF_LOCATION/bin/macos/libjprofilerti.jnilib=offline,id=SESSION_ID,config=/Users/srinath/.jprofiler7/config.xml 
    Here replace the configuration file and JProfiler location with your machine's values. SESSION_ID is the session ID of the session you created with JProfiler UI.  Here ~./jprofiler7/config.xml file has settings for all sessions creates by jprofiler, and it will pick up the right value.  If you are profiling in a remote machine copy your local configs to the remote host. 
  4. Start and run the program, and it will print  the following
  5. JProfiler> Using JVMTI
    JProfiler> JVMTI version 1.1 detected.
    JProfiler> 64-bit library
    JProfiler> Offline profiling mode.
    JProfiler> Using config file /Users/srinath/.jprofiler7/config.xml (id: 194)
    JProfiler> Listening on port: 8849.
    JProfiler> Instrumenting native methods.
    JProfiler> Can retransform classes.
    JProfiler> Can retransform any class.
    JProfiler> Native library initialized
    JProfiler> VM initialized
    JProfiler> Using dynamic instrumentation
    JProfiler> Time measurement: elapsed time
    JProfiler> CPU profiling enabled
    JProfiler> Hotspot compiler enabled

  6.  Open the snapshot file and analyze using JProfiler UI. 



Tuesday, July 3, 2012

Slides for Cloud Hands-on Tutorial in IEEE Cloud 2012

Following are the slides I used for the tutorial. As the sample, we deployed a Web Service and a Web Application. When deployed, users access the web application, which will do a call to the Web Service and show the results.

Hands on deployed the service and webapp in three setup.
  1. In the local machine
  2. In the Amazon EC2 (IaaS)
  3. In the WSO2 Stratos (PaaS)
The script for the demo can be found below. Please note it is only  a rough script. 




Abstract

When cloud computing was introduced a few years ago, it promised many advantages including self-service, elasticity, pay as you go, improved accessibility to computation resources, and full deployment automation. Few years down the line, now we can see many ideas, architectures, and systems shaping up providing more clarity and understanding into cloud computing landscape. Among them, there are many cloud computing systems that enable users to build systems in the cloud with minimal effort. Understanding those systems could provide great insights and understanding into both “cloud promise” and “cloud reality”. This tutorial will provide a brief introduction to the cloud, and discuss how to use some of the existing cloud computing systems with hands-on. We will demonstrate how to develop and run a SOA application in an Infrastructure as a Service platform (IaaS) as well as in a Platform as a Service Platform (PaaS).

Script 


As a demo, we will deploy a Web Service and a Web Application. When deployed, users access the web application, which will do a call to the Web Service and show the results. You can download the sample code from http://people.apache.org/~hemapani/dist/clouddemo/HelloStratos.zip. Download and unzip the distribution.
  1. webapp folder will have the Web app, and you can build the HelloStratosWebapp.war file by running the ant command from the webapp folder.
  2. services folder has the Web service, and you can build theHelloStratos-1.0.aar file by running mvn clean install

Local Hello Stratos Demo

  1. Edit the webapp/src/main/resources/index.jsp to do the service call to 127.0.0.1. Rebuild the webapp by running ant from webapp directory.
  2. Download WSO2 AS
  3. Running following commands to install WSO2 AS
    unzip wso2as-4.1.2.zip
    cd /Users/srinath/playground/cloud-2012-tutorial/wso2as-4.1.0
    cp /Users/srinath/playground/cloud-2012-tutorial/HelloStratos/webapp/target/HelloStratosWebapp.war repository/deployment/server/webapps/
    cp /Users/srinath/playground/cloud-2012-tutorial/HelloStratos/service/target/HelloStratos-1.0.aar repository/deployment/server/axis2services/
    
    
    
  4. Start the WSO2 AS via following commands.
    cd bin/
    ./wso2server.sh
    
    
    
  5. Go to the admin console and login via https://127.0.0.1:9443/carbon/admin/login.jsp, username password are admin, admin.
  6. Try out the service. 
  7. Show soap traces.
  8. Show Web app.
  9. Try out the Web app 

EC2 Demo

  1. Create AWS account
  2. Go to management console
  3. Create instance(Base AMI ami-2ab91843 and AMI with all setup AMI: Unnamed (ami-5805a731)) Create key pair in the process, save the key pair
  4. Connect to instance (srinath-2012.pem is the keypair in this case)

  5. ssh -i ~/.ssh/ec2/srinath-2012.pem ubuntu@ec2-23-22-125-63.compute-1.amazonaws.com

  6. Create a folder ieee-clouddemo
  7. Download WSO2 AS and install following command

  8. wget http://people.apache.org/~hemapani/dist/wso2as-4.1.2.zip
    sudo apt-get install unzip  
    
    Unzip WSAS distribution
    
    wget http://download.java.net/jdk6/6u34/promoted/b03/binaries/jdk-6u34-ea-bin-b03-linux-amd64-20_jun_2012.bin (direct link http://jdk6.java.net/download.html)
    
    ./jdk-6u34-ea-bin-b03-linux-amd64-20_jun_2012.bin
    
    export JAVA_HOME=/home/ubuntu/jdk1.6.0_34
    

  9. Edit Host name in carbon.xml
  10. Access Admin console via https://ec2-107-20-54-230.compute-1.amazonaws.com:9443/carbon/admin/login.jsp
  11. Upload the service via admin console
  12. tryout the service
  13. Download the HelloStratosWebapp.war from
  14. Upload the war file
  15. try out the war file
  16. Save the AMI

Stratos Demo

  1. Create an account from https://stratoslive.wso2.com/
  2. Login to Application Service in Stratos https://stratoslive.wso2.com/t/ieeecloud1.org/carbon/ (replace the ieeecloud1.org with your tenant.)
  3. Show services and compare with the standalone version
  4. Upload the service to stratos
  5. try it out
  6. Login to windows VM and upload the Web app
  7. Try the demo
  8. Enable security via uncommenting commented parts in web.xml
  9. Re-upload the services
  10. Try the Web app, it will ask for passwords. Login via admin admin.

Wednesday, June 6, 2012

Few thought on debugging

If you a developer who works on Distributed system, there is one thing you learn well. That is how to debug, and how to avoid having to debug. Following are some of my thoughts and somethings I generally do.

Debugging a Local Java Program

  1. If you write your program well, generally you will have a stack trace when you have a problem. (This does not apply well with performance problems and memory leaks. I will write a separate note about those. )
  2. Look at the trace; go to where the error happened. Try to figure out what happened. The best way to do this is by walking through the logic again.
  3. If that did not work, copy and paste your stack trace in to Google. About 80% of the time, you will find the answer there. Pay special attention to JIRA bug reports for the projects you are using and online forums like stackoverflow.com.
  4. If that did not work, you will have to debug. You can debug by running the code from your IDE (e.g. Eclipse) or by connecting to a server through a remote debugger. Walk through the logic using the debugger that generally tells what happened. For example, the article explains how to debug with eclipse and how to connect to a remote server.
  5. If none of these worked, now it is the time to go and ask for someone to help. There are some bugs that are very hard for the author of the code to see. However, you should go for help with problem recreated, debugger attached, and ready to let him step through the execution.
  6. If you have trouble with a specific tool, you can ask for help as user lists, forums, or general developer forums like stackoverflow.com.

Debugging a Distributed System

  1. Debugging distributed systems are hard. So best approach is to not to have to debug them. You can almost get there by writing unit tests and tests what you write in small steps. My preferred approach is to make one path work end to end, and do small changes while testing each change.
  2. Distributed system will have multiple Processors (JVMs). So it is tricky to debug them. If it is at all possible, find a way to run whole your distributed system within the same process (JVM). This will need some imagination from your end, but it will save you lot of trouble later.
  3. If you are debugging a distributed system, it is often useful to capture messages that are sent and received. You can do this via tools like TCP Monitor, SOAP Monitor, or Wireshark.
  4. It is doubly important to log all exception that can happen in your code. Otherwise, you will have no idea whether system worked or not. 
  5. I often append the timestamp and the name of process or host to each log line. One way to do this is by writing a Log4j appender. Time stamp and process or host name let me merge sort all the logs into one file and read the execution of the system in one read.
  6. It is likely that your distributed system process lot of messages. So it is very hard to read and understand the log. One way out of this is to trace every 1000th messages. I do this by having a message count and using.
    if(1000%messageCount=1){
       log.info(….);
    }
    
  7. If you running a complex system that has more than five nodes, you should invest in some mechanism to collect the logs using something like FLUME and automate their processing to find stack traces etc.

Tuesday, June 5, 2012

Scaling WSO2 Stratos

WSO2 Stratos is a Platform as a Service (PaaS) that offers middleware technologies like Web Services, Workflows, Messaging etc as a Service.

PaaS environments bring together many users and could potentially attract a large number of users. Therefore, scaling up is a major consideration for a PaaS. This post explain our experiences and some thoughts on scaling Stratos.

Problem

Stratos is multi-tenanted. In other words, there are many tenants. Each tenant generally represents an organization and isolated from other tenants, where each tenant has his own users, resources, and permissions. Stratos supports multiple PaaS services. Each PaaS service is actually a WSO2 Products (e.g. AS, BPS, ESB etc.) offered as a service. Using those services, tenants may deploy their own Web Services, Mediation logic, Workflows, and Gadgets etc.
  1. WSO2 Stratos runtime provides servers where each can support multiple tenants and provide a PaaS service. For example, there are multi-tenanted AS, BPS, ESB etc.
  2. Stratos can provision (add/remove) resources (computing nodes) as needed on demand.
  3. Problem is to build a system that scale up and down without end users realizing it.

Answers

Following describes a series of solutions while each solution adds a new feature to solve a specific problem. It explains the rationale and thought process behind the final design.

Solution 1
WSO2 Stratos consists of a multi-tenanted server of each type (e.g. ESB, BPS etc.). Users first talks to an Identity Server (IS), gets a SSO (single sign-on) token, and log in to any server. We stored all tenants data in a registry. Each server loads all tenants at the startup and can support any tenant when they receive a request.

Solution 2
Solution 1 does not scale at all. So we started running multiple instances of each server and put a load balancer (LB). LB load balances the requests to different servers. When load on the server instances are high, LB starts new server instances and when the load is low, LB shuts down some instances. We call this auto-scaling.

Solution 3
When Stratos had several hundred tenants and many tenants with tens of services, it took a long time to load all tenants at the startup. Start up took 15-30 minutes. Furthermore, most tenants stays inactive most of the time. However, since each node has to hold all tenants, Stratos spends resources for inactive tenants as well.

To avoid above problems, solution 3 added lazy loading. All information about tenants is stored in a central registry. Tenants are loaded into memory only when they are needed. Tenants get unloaded when they have been idle for more than a given timeout. You can find more information about Lazy loading from Azzez’s blog entry “Lazy Loading Deployment Artifacts in a PaaS Deployment”.

Solution 4
When tenants have several artifacts, loading them takes time. So if the tenant is accessed while it has not been loaded in solution 3, first request or two to the tenant will timeout.

Solution 4 added ghost deployer to solve the above problem. Ghost deployer does not load all information about tenants, but just loads the metadata. Actual artifacts are loaded on demand. As a result, loading a tenant has become a much simpler in Solution 4. So this avoids requests from timing out while loading the tenant. You can also find more information about Lazy loading from Azzez’s blog entry “Lazy Loading Deployment Artifacts in a PaaS Deployment”.

Solution 5
However in the solution 4, LB does not scale to handle a large number of requests. So we replace the LB with multiple LBs that have same metadata in all nodes. Therefore, all LB will take the same decision when it received a request. We can use this model to scale Stratos by placing a hardware Load balancer or setting up DNS round robin to distributed requests among LBs.

To synchronize the metadata across all LBs, we can use group communication. That is a MXM communication, which is heavy. Instead,  LBs in Stratos are designed to send updates as batches to a single decision service, and the decision service takes auto-scaling decisions. We enforce High Availability by running a replica of the decision service and keeping it up-to-date via state replication through group communication.

Solution 6
However, in Solution 5, LB instances are not aware of tenants. Due to lazy loading, all requests will work even through LBs route messages arbiterly. However, this might lead to a scenario where a single node has to load too many tenants.

To avoid this, in the solution 6, LBs are aware of tenants and allocate only a subset of tenants to each LB. You can find more information from the Sajeewa’s blog entry, WSO2 Tenant Aware Load balancer.

Solution 7
Upcoming Stratos release will follow the solution 6. However, the next potential problem is that Registry which holds the configurations and resources of all tenants could not scale to handle a large number of tenants. Hence the registry needs to be partitioned across multiple users.

Thursday, May 31, 2012

Getting Captcha to do useful work



Getting Captcha to digitize books. Well solution is simple and elegant. show two words, one is  known  while other is one to digitize. Users do not know which is which. If the control (known word) is good, we can trust the second with high probability.

This is done by many sites, and they have digitized about 2.5 million books per year this way. See the above Ted talk by Louis Ahn for details.

They are also trying to use language learners to translate the web, which shows the samples to translate and combine translations to create the final version.

This, in my opinion, is a truly creative solution.

May be we can use the same idea to tag images. For example, show a picture and ask how many elephants are in this picture. It is not very easy for a program to answer such random questions on pictures. Now play the same tick with controls and without controls as above. (Well need some thinking to get it right .. but)

Friday, May 25, 2012

Debugging Hadoop Task tracker, Job tracker, Data Node or Name Node

Hadoop conf/hadoop-env.sh has following environment variables 

  1. HADOOP_NAMENODE_OPTS
  2. HADOOP_SECONDARYNAMENODE_OPTS, 
  3. HADOOP_DATANODE_OPTS
  4. HADOOP_BALANCER_OPTS
  5. HADOOP_JOBTRACKER_OPTS
  6. HADOOP_TASKTRACKER_OPTS


You can use them to start the remote debugger so that you can connection and debug any of the above servers. Unfortunately, Hadoop tasks are started through a separate JVM by the task tracker, and you cannot use this method to debug your map or reduce function as they run in separate JVMs. 

To debug task tracker, do following steps. 

1. Edit conf/hadoop-env.sh to have following

export HADOOP_TASKTRACKER_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=5000,server=y,suspend=n"

2. Start Hadoop (bin/start-dfs.sh and bin/start-mapred.sh)
3. It will block waiting for debug connection
4. Connect to the server using Eclipse "Remote Java Application" in the Debug configurations and add the break points
5. Run a map reduce Job 

Friday, May 18, 2012

How to scale Complex Event Processing (CEP) Systems?

What is CEP?

Complex event processing (CEP) systems query events on the fly without storing them.
  • For an introduction and definition of CEP, please refer to CEP Blog and Wikipedia.) . 
  • If you need a real comprehensive coverage of CEP, read the paper "Processing flows of information: from Data stream to Complex Event Processing" [1]. (or the slide deck). 
In CEP, we think in terms of event streams. Event stream is a logical sequence of events that become available over time. For example, stock event steam consists of events that notify changes to stock price. Users provide queries to the CEP engine, which implements the CEP logic, and the CEP engine matches those queries against events coming through event streams.

CEP differs from other paradigms like event processing, filtering etc., by its support for temporal queries that reason in terms of temporal concepts like "time windows" and "before and after relationships" etc. For example, a typical CEP query will say that

“If IBM stock value increased by more than 10% within an hour, please notify me".

Such a CEP query has few defining characteristics.
  1. CEP queries generally keep running, and keep emitting events when events match the condition given in the query.
  2. CEP query operates on the fly and stores only minimal amount of events into a storage.
  3. CEP Engines responds to conditions generally within milliseconds range.

What is Scaling CEP?

There are many CEP Engine implementations (see CEP Players list 2012). However, mostly CEP engines run in a large box, scaling up horizontally. Vertically scaling CEP engines is still an open problem. Reminder of this post discusses what vertically scaling CEP engine means and some of the potential answers.

We use the term Scaling to describe the ability for a CEP system to hande larger or complex queries by adding more resources. Scaling CEP has several dimensions.
  1. Handling Large number of queries
  2. Queries that needs large working memory
  3. Handling a complex query that might not fit within a single machine
  4. Handling large number of events

How to Scale CEP?

Let us consider each dimension separately.

Handling Large Number of Queries

This is the easiest of the four since we can use the shared nothing architecture. Here we can run multiple CEP Engine instances (each instance runs in a single host) each running a subset of queries.
  1. Trivial implementation will send all events (streams) to all the CEP engines
  2. More optimized implementations can use a Publish/subscribe message broker network (like Narada Broker). Here each CEP engine should analyze the deployed queries and subscribe to required event streams in the broker network. Generally, we match each event stream to a topic in the publish/subscribe system.
  3. Third option is to delegate the event distribution to a Stream Processing system (e.g. Apache S4 or Strom). For instance, links [4] and [5] describe such a scenario to run Esper within Strom.

Queries that need large working memory

For instance, a long running complex query that needs to maintain a large window and all events in the window would need a large working memory. Potential answer to this problem is to use a distributed cache to store the working memory. Reference [6] describes such a scenario.

Handling large number of events handling a complex query that might not fit within a single machine

We will handle the both scenarios together as both are two sides of the same coin. In both cases, we have trouble fitting a single query into a single host such that it can support the given event rate.
To handle such scenarios, we have to distribute the query across many computers.

We can do this by breaking the query into several steps in a pipeline that matches events against some conditions and republish the matching events to steps further in the pipeline. Then we can deploy different steps of the pipeline into different machines.

For example, lets consider the following query. This query matches if there are two events within 30 seconds from IBM stocks that having price greater than 70 and having prize increase more than 10%.

select a1.symbol, a1.prize, a2.prize from every 
    a1=StockStream[price > 70  symbol =’IBM’] -> 
    a2=StockStream[price > 70  symbol =’IBM’]
        [a1.price < 1.1*a2.prize][within.time=30]


As shown by the figure, we can break the query into three nodes, and each node will have to republish the matching events to the next node. (Option 1)

CEP Query as a Pipeline

However, queries often have other properties that allow further optimization. For example, although the last step of matching prize increase is stateful other two steps are stateless. Stateful operations remember information after processing an event so that earlier events affect the processing of later events while stateless operations only depends on the event being processed.

Therefore, we can add multiple instances in the place of those statless instances using a shared-nothing architecture. For example, we can break the query into five nodes as shown by the bottom part of the picture (Option 2).

Also another favorable fact is that CEP processing generally happens through filtering where amount of events reduce as we progress through the pipeline. Therefore, pushing stateless filter like operations (e.g. matching against symbol ="IBM") to the first parts of the pipeline and scaling them in shared nothing manner should allow us to scale up the system for much higher event rates. For example, lets say that the StockQuote event stream generates 100,000 events per seconds, but only 5% of them are about IBM. Therefore, only 5000 events will make it past the first filter, which we can handle much easier than 100k events.

However, it is worth noting that above method only works with some queries. For example, if we have a query that has a single stateful operation like window-based pattern, we cannot use this method.
Unfortunately, there is no framework that can do this out of the box (let me know if I am wrong). So if you want to do this, you will have to code it. If you choose to do that, using a pub/sub network or stream processing system might reduce most of the complexities.

Please shared your thoughts!
  1. Alessandro Margara and Gianpaolo Cugola. 2011. Processing flows of information: from data stream to complex event processing. InProceedings of the 5th ACM international conference on Distributed event-based system(DEBS '11).
  2. http://www.thetibcoblog.com/2009/08/21/cep-versus-esp-an-essay-or-maybe-a-rant/
  3. http://www.slideshare.net/TimBassCEP/mythbusters-event-stream-processing-v-complex-event-processing-presentation
  4. Run Esper with Storm -http://stackoverflow.com/questions/9164785/how-to-scale-out-with-esper
  5. http://tomdzk.wordpress.com/2011/09/28/storm-esper/
  6. Distributed Cache to scale CEP -http://magmasystems.blogspot.com/2008/02/cep-engines-and-object-caches.html

Thursday, May 3, 2012

How to measure the Performance of a Server?

I have repeated following too many times in last few years and decide to write this up. If I have missed something, please add a comment.

Understanding Server Performance

Characteristic Performance Graph's of a Server

Above graphs capture the characteristic behavior of a server. As shown by the graph, server performance is gauged by measuring latency and throughput against latency.
  • Latency measures the end-to-end time processing time. In a messaging environment, teams determine latency by measuring the time between sending a request and receiving the response. Latency is measured from the client machine and includes the network overhead as well.
  • Throughput measures the amount of messages that a server processes during a specific time interval (e.g. per second). Throughput is calculated by measuring the time taken to processes a set of messages and then using the following equation.

Throughput = number of completed requests / time to complete the requests

It is worth noting that these two values are often loosely related. However, a we cannot directly derive one measurement from the other.

As shown by the figure, a server has an initial range where throughput increases at a roughly linear rate and latency either remains constant or linear. As concurrency increases, the approximately linear relationship decays, and system performance rapidly degrades. Performance tuning attempts to modify the relationship between concurrency and throughput and/or latency, and maintain a linear relationship as long as possible.

For more details about latency and throughput, read the following online resources:
  1. Understanding Latency versus Throughput
  2. Latency vs Throughput
Unlike static server capacity measurements (e.g. CPU processing speed, memory size), performance is a dynamic measurement. Latency and throughput are strongly influenced by concurrency and work unit size. Larger work unit size usually negatively influence latency and throughput. Concurrency is the number of aggregate work units (e.g. message, business process, transformation, or rule) processed in parallel (e.g. per second). Higher concurrency values have a tendency to increase latency (wait time) and decrease throughput (units processed).

To visualize server performance across the range of possible workloads, we draw a graph of latency or throughput against concurrency or work unit size as shown by the above graph.

Doing the Performance Test

Your goal of running a performance test is to draw a graph like above. To do that you have to run the performance test multiple times with different concurrency and for each test measure latency and throughput.

Following are some of the common steps and a checklist.

Workload and client Setup

  1. Each concurrent client simulates a different user. Each run in a separate thread, and run a series of operations (requests) against the server.
  2. First step is finding a workload. If there is a well-known benchmark for the particular server you are using, use that. If not, create a benchmark by simulating the real user operations as closely as possible.
  3. Messages generated by the test must not be identical. Otherwise, caching might come to play and provide too optimistic results. Best method is to capture and replay a real workload. If that is not possible, generate a randomized workload. Use a known data set whenever it makes sense.
  4. We measure latency and throughput from the client. For each test run we need to measure following.
  5. End to end time taken by each operation. Latency is the AVERAGE of all end-to-end latencies.
  6. For each client, we collect the test-started time, test-end time, and the number of completed messages. Throughout is the SUM of throughput measured at each client.
  7. To measure the time, if you are in Java, you can use System.nanoTime() or System. currentTimeInMillis () and with other programing languages you should use equivalent methods.
  8. For each test run, it is best to take readings for about 10,000 messages. For example, with concurrency 10, each client should send at least 1000 messages. Even if there are many clients, each client should at least send 200 messages.
  9. They are many tools that can do the performance test. Examples are JMeter, LoadUI, javabench, ab. Use them when applicable.

Experimental Setup

  1. You may need to tune the server for best perforce with settings like enough Heap memory, open file limits etc.
  2. Do not run both client and the server on the same machine (they interfere with each other and results and affected)
  3. You need at least 1GB network to avoid the interference of the network.
  4. Generally, you should not run more than 200 clients from the same machine. For some cases, you might need multiple machines to run the client.
  5. You have to note down and report the environment (Memory, CPU, number of cores, operating system of each machines) with the results. It is a good practice to measure the CPU usage and memory while test is running. You can use JConsole (if it is Java) and if you are in a linux machine run “watch cat /proc/loadavg” command to track load average. CPU usage is a very unreliable matrix as it changes very fast. However, load average is a very reliable matrix.

Running the test

  1. Make sure you restart the server between each two test runs
  2. When you start the server, first send it few hundred requests before starting the real test to warm up the server.
  3. Automate as much as possible. Ideally running one command should run the test, collect results, verify the results, and print summery/ graphs.
  4. Make sure nothing else is running in the machines at the same time.
  5. After test run has finished, check the logs and results to make sure operations were really successful.

Verifying your results

  1. You must catch and print the errors both at the server and the client. If there are too many errors, you results might be useless. Also it is a good idea to verify the results at the client side.
  2. Often performance tests are the first time you stress your system, and more often than not you will run into errors. Leave time to fix them.
  3. You might want to use a profiler to detect any obvious performance bottlenecks before you actually run the tests. (Java Profilers: JProfiler, YourKit)

Analyze your results and write it up

  1. Data cleanup (Optional) – it is a common practice to remove the outliers. General method is to either remove anything that is more than 3 X stddev different from the mean or remove 1% of furthest data from mean.
  2. Draw the graphs. Make sure you have a title, captions for both X and Y-axis with Units, and legend if you have more than one dataset in the same graph. (You can use Excel, OpenOffice, or GNU Plot). Rule of thumb is that reader needs to be able to understand the graph without reading the text).
  3. Optionally, draw 90% or 95% confidence intervals (Error bars)
  4. Try to interpret what results mean
    • Generalize
    • Understand the trends and explain them
    • Look for any odd results and explain them
    • Make sure to have a conclusion
Hope this was useful. If you enjoyed this post you might also like Mastering the 4 Balancing Acts in Microservices Architecture and Distributed Caching Woes: Cache Invalidation



Wednesday, May 2, 2012

Scaling Distributed Queues: A Short Survey

Following is a part of the related works survey from the paper, "Andes: a highly scalable persistent messaging system". However, following has nothing about Andes, but only how different distributed queue implementations work. I will write about Andes later.

What is a Distributed Queue?

Distributed Queue is a FIFO data structure that is accessed by entities in a distributed environment. Working of a distributed queue will be as follows.
  1. There are two types of users (publishers and subscribers)
  2. A users creates a queue (or queues may be created on demand)
  3. Subscribers subscribe to a queue
  4. Publisher send a message (publish) to the queue
  5. Published message is sent to a one of the subscribers who has subscribed to the queue
Distributed Queues provides strict or best effort support for in-order delivery where subscribers receives messages at the same order they have been published. (It is very hard to enforce this across all subscribers, and therefore, often implementations enforce this within each subscriber. For example, if messages m1, m2 .. m100 are published in order, each subscriber will see a subset of messages in ascending order. But there are no guarantee about the global order seen across subscribers).

What does Scaling Distributed Queues means?

Scaling is handling larger workload by adding more resources. Workload can be increased in many ways, and we call those different dimensions of scale. There are three main dimensions.
  1. Scaling to Handle large number of queues
  2. Scaling to handle a queue that has large workload
  3. Scaling to handle large messages

Distributed Queue Implementations

There are many distributed queue implementations in JMS servers like ActiveMQ, HorentMQ etc. Focus of our discussion is that how can they scale up.

There are four choices
  1. Master-Salve topology – queue is assigned to a master node, and all changes to the queue are also replicated to a salve node. If the master has failed, the slave can take over. (e.g. Qpid and ActiveMQ, RabbitMQ).
  2. Queue Distribution - queues are created and live in a single node, and all nodes know about all the queues in the system. When a node receives a request to a queue that is not available in the current node, it routes the request to the node that has the queue. (e.g. RabbitMQ)
  3. Cluster Connections – Clients may define cluster connections giving a list of broker nodes, and messages are distributed across those nodes based on a defined policy (e.g. Fault Tolerance Policy, Load Balancing Policy). It also supports message redistribution, which means if the broker does not have any subscriptions, the messages received by that broker are rerouted to other brokers that have subscriptions. It is worth nothing that server side (brokers) plays a minor role in this setup.
  4. Broker networks - The brokers are arranged in a topology, and messages are propagated through the topology until the messages reach a subscriber. Usually, this uses Consumer priority mode where brokers that are close to the point of origin are more likely to receive the messages. The challenge is how to load balance those messages. (e.g. ActiveMQ)
Replication in distributed queues is inefficient as delivering messages in-order needs replication of state immediately.

In cluster connections and broker networks, in order message delivery provides a best effort guarantee only. If a subscriber has failed or subscription has been deleted, the broker nodes are force to either drop the message or to redistribute them out of order to the other brokers in the network.

Any of the above modes do not handle scaling for large messages

Summery

TopologyProsConsSupporting Systems
Master SlaveSupport HA No Scalability Qpid, ActiveMQ, RabbitMQ
Queue DistributionScale to large number of QueuesDoes not scale for large number of messages for a queueRabbitMQ
Cluster ConnectionsSupport HAMight not support in-order delivery Logic runs in the client side takes local decisions.HorentMQ
Broker/Queue NetworksLoad balancing and distributionFair load balancing is hardActiveMQ

Authentication and Authorization Choices in WSO2 Platform

Following diagram come out of a chat with Prabath, and it shows most of the public APIs of WSO2 Identity Server, and typical design and deployment choices with implementing authentication and authorization with WSO2 platform.

Authentication and Authorization Choices in WSO2 Platform


Each server in the WSO2 platform is built using the Carbon platform. We use the term “Carbon server” to denote any Carbon based server like ESB, AS, BPS.

Techniques explained here are applicable across most of the WSO2 products. In the following figure, and the circles with branching out paths shows different options.

As shown by the figure, Carbon server may receive two types of messages: messages with credentials (like passwords), and messages with tokens. When a server receives a message with credentials, the server first authenticates the request and optionally authorizes the action. When the server receives a message with tokens, generally there is no authentication step, and the token is directly validated against permissions and request is either granted or denied.

Authentication

Authentication needs a User store that holds the information about users and “Enforcement Point” that verifies the credentials against the User store.
Carbon Servers support two user stores.
  1. Database based user store
  2. LDAP based user store
It is a common deployment pattern for multiple carbon servers in a single deployments to point to the same user store, and this provide a single point to control and manage the users.

We can configure any Carbon server to authenticate any incoming requests. It supports many options like HTTP Basic Authentication over SSL for HTTP, WS-Security User Name Tokens, Web SAML SSO etc. This authentication is done against the users that reside the user store.

Also, each Carbon server has a Web Service called Authentication Admin Web Service, which exposes the authentication as a Web Service to the outside. The client can invoke the Authentication Admin Web Service and get a HTTP Cookie after logging in and reuse the Cookie to do authenticated calls to a Carbon Server.

Authorization

In Authorization Scenarios, Carbon server receives a request that is generally already authenticated or a request that include a token. In either case, we want to check weather the authenticated user have enough permission to carry out a given action.
Using XACML terminology, we can define three roles in such a scenario. (XACML includes other roles, which we will ignore on this discussion).
  1. PEP (Policy enforcement Point) intercepts requests and makes sure only authorized requests are allowed to proceed.
  2. PDP (Policy definition Point) stores the permissions and verify that given user have enough permissions
  3. PAP (Policy Administration Point) let users define and change permissions.
Carbon servers support the Policy Enforcement Point (PEP) role using a WSO2 ESB Mediator or Apache Axis2 Handler or through a custom user code.
For Policy Definition Point (PDP), we support three ways to define permissions.
  1. Database based permission stores – permissions are stored in the Database
  2. XACML – permissions are described using XACML specification
  3. KDC - Permissions are provided as Kerberos Tokens
We support policy administration (PAP) through WSO2 Identity Server, which enables users to edit the permission definitions through the management console.
These gives rise to several scenarios
  1. If the Database based permission store is used, we can configure any Carbon Server to connect to the permission database directly and load the permissions to the memory. Then it authorizes user actions using those permission descriptions. Carbon servers also have an Authorization Admin Web Service that let users check for permissions of a given user remotely.
  2. If XACML based authorizations are used, there must be an Identity Server that acts as a PDP (Policy Definition Point). Each Carbon server (acting as the PEP, Policy Enforcement Point) invokes an Entitlement Service available in the Identity Server to check the permissions. Entitlement service is available as a Web Service or a Thrift Service.
  3. If Carbon server receives a Kerberos token, it talks to a configured Kerberos Server and verifies token. WSO2 IS come bundled with Apache KDC out of the box.
More information out WSO2 Identity Server can be found from http://wso2.com/products/identity-server/ and if there are any missing features, please drop a note to WSO2 architecture List.

Saturday, April 28, 2012

Sunday, April 22, 2012

Generating a Distributed Sequence Number

This is a very common problem in distributed systems (e.g. Message brokers, implementing "At most once deliver", Group communication etc). I was doing some reading for WSO2 Andes project.
There are several options.
  1. Using Zookeeper: Following two threads are talking about this. It should be reasonably fast. Twitter guys have tried this says it was bit slow.
    http://zookeeper-user.578899.n2.nabble.com/Sequence-Number-Generation-With-Zookeeper-td5378618.html
    http://www.mail-archive.com/zookeeper-user@hadoop.apache.org/msg01976.html
  2. Cassandra:  This has been raised several times, the nd answer was to use UUIDs (which does not work for us)
    http://comments.gmane.org/gmane.comp.db.cassandra.user/3304
    http://stackoverflow.com/questions/3935915/how-to-create-auto-increment-ids-in-cassandra.

    Then Cassandra introduced counters, but it does not support incrementAndGet() and no plan to do the future as well. So that does not work.
  3. http://www.datastax.com/dev/blog/whats-new-in-cassandra-0-8-part-2-counters
  4. Write a custom server: This is easy, basically create a service that give a increasing ID. But very hard to cluster this and behavior in case of a failure is complicated.
  5. "A timestamp, worker number and sequence number": Twitter Guys created solution based on "a timestamp, worker number and sequence number" (this is kind of that we use as well, except that ran few dedicated servers for this) http://engineering.twitter.com/2010/06/announcing-snowflake.html
  6. Other Algos: Only looked at these briefly. But they are complicated.
    Using DHTs: http://horicky.blogspot.com/2007/11/distributed-uuid-generation.html
    A Fault-Tolerant Protocol for Generating Sequence Numbers for Total Ordering Group Communication in Distributed System, http://www.iis.sinica.edu.tw/page/jise/2006/200609_16.pdf
IMHO, "a timestamp, worker number and sequence number" is the best option. Only downside of this is that this assumes that broker nodes are loosely synced in time. Only other option I see is Zookeeper.

Good overview - http://stackoverflow.com/questions/2671858/distributed-sequence-number-generation/5685869.

Hope this was useful. If you enjoyed this post you might also like Mastering the 4 Balancing Acts in Microservices Architecture and Distributed Caching Woes: Cache Invalidation




Tuesday, January 31, 2012

Matching upward of 50k events per second close to real time?

We have a usecase on a R&D project to match a user defined query against an event stream that has about 50k transactions/second event rate. The usecase will use the results to make real time marketing recommendations. Following is a summary of the usecase.

  1. System gets about 50k messages/sec generated from transaction servers. 
  2. Each message contains name value properties
  3. Goal is to run user defined long running temporal queries on that data. For example, if a user has sent more than 100 SMS messages in the last month, give him a promotion. Marketing specialists provide those queries through a UI Wizard that deploys the query in to the system. (Queries are limited, they basically match properties or perform aggregations over a given time window)
  4. Goal is to match queries against events and respond with promotions within 5s of receiving the last event that completes the condition. 

There are many ways we can try to answer the problem.
  1. SQL - traditional answer is dump the data in to a RDBMS and periodically query (say every 3sec) the database and send promotions for all the matches. However, it is likely that the database could not keep up either writing that much of data or querying them. 
  2. CEP - Complex Event Processing is the ideal answer for the problem. They could handle events on this scale and support user defined queries. However, current CEP engines does not support persistance, and if the server fails while running a query with 1 month window, there is no way to recover the state. 
  3. Hadoop - Query is a easy one for Hadoop, but it works on batch processing mode and likely to take more than 5s. 
  4. Stream processing - Stream processing could scale to this usecase, but handling of temporal queries is unclear. 
  5. Cassandra - idea of building indexes while writing data to Cassandra might work. The we can periodically query and send promotions for matching users. However, cassandra is bad on incrementing a value due to its idempotent nature, and that will be a problem with the parallel writes. 
Following is the answer we are thinking about. It uses a combination of Cassandra + Zookeeper + Caching. 

First, users define queries they need to monitor. Those queries are either simple conditions (which is easy to implement) or aggregations done over a period of time. The the Query complier generates a Matcher to detect the query condition and deploys the code in processing servers. 

Each processing sever processes incoming events using matcher and updates in memory cache about detected conditions (e.g. count of aggregation values). There is a thread that periodically locks the cassandra and update the values stored in cassandra to also include the new values. Processing servers will use Zookeeper to take a lock before updating cassandra. 

There is also a thread that periodically (once a day) runs and update the counts maintained by cassandra to reflect the time windows by removing the  elapsed day's count from cassandra count. 

Finally, a promotion server periodically search indexes and send promotions for matching users. 

Sunday, January 22, 2012

Steve Jobs's Biography

I finished reading the Steve Jobs's biography, and following are some of the impressions.


  1. He is an *^%^&@^&! : IMO Author does a nice job of presenting SJ's life without much sugar coating. His was a complex life and sometimes he has been unbelievably mean and irrational. For example, after he was being a millioner there was a time his daughter and her mother lived on welfare. Another was that at early time a company he worked for gave him a contract to do a circuit board design. It carries a bonus for each item saved less than 60 parts. He did it with Steve Wozniak and shared the main fee, but never mention the bonus to Steve. There are enough examples in the book on his manipulations of people. Book gave impressions thing got better with time, but not conclusively. 
  2. I am impressed by the SJ's ability to build amazing products without understanding the inner working or technology. May be his lack of technical knowledge enables him to look at problems differently. 
  3. He builds great product using instincts that "something is not quote right" Basically he often says this is wrong (in colorful language of course), but did not really knew to say how to fix it. However, in his case doing that was enough to build great products. My takeaway is that when we work with great teams, we should point out problems and demand better even when we do not exactly know how to fix it. 
  4. Focus on few items: SJ strongly believed that his company should work on few things, but do them better by focusing and repeatedly evaluating and changing. He was always ready to go back and redo stuff, and as per the book, he has done that at least once with each of his major successes. 
  5. Willingness to go back and redo stuff: As described in #4, his principle was focus on few and redo it until you get it right. 
  6. Reality distortion fields: Book (term has been used before) uses SJ's ability to convince both himself and people around him that what seemingly impossible is possible. It has it's good and bad sides, where when goal is really possible, people end up doing it vs. when goal is impossible, people end up with failures. Regardless, to me this shows power of charisma. 

Sunday, January 15, 2012

Distributed Communication: Causes and effects

This is an effort to look at drivers for some of the key distributed communication technologies. I created this for one of my classes.