{ Make this readable }
Showing posts with label #streamcruncher. Show all posts
Showing posts with label #streamcruncher. Show all posts

Thursday, March 18, 2010

Digging up links to my old Event Stream Processor

I was digging up links and articles to my old StreamCruncher hobby project. Cleaning up my bookmarks, rather.Here's what Google came up with (after some manual filtering):

- Paul Dekker's Master's thesis on Complex Event Processing with StreamCruncher, RuleCore and Esper as case studies

- Edson Tirelli's blog entry, one of the Drools guys

- An Introduction To Data Stream Query Processing from Truviso

- Creating an event driven SOA

- Marco on in-memory DBs for CEP

- The SQL debate on Financial Techinsider.

- Zepheira presentation

Later.

Friday, September 04, 2009

Windowing features in PostgreSQL 8.4

PostgreSQL recently added a whole bunch of mini-analytic features in version 8.4. It also happens to be Turing complete with support for recursion. Ha! Recursion in SQL...

To me, the most interesting feature was the introduction of the partition by and related clauses. I remember using this feature in Oracle 10g in 2005, which I think was part of the Analytics package if I'm not mistaken. It's important to me because this is what inspired me in some way to start working on StreamCruncher and explore other Windowing concepts that are now standard in any Event Stream Processing product.

SELECT key, SUM(val) OVER (PARTITION BY key) FROM tbl;

Wednesday, July 22, 2009

To parse or not to parse

I remember spending a lot of time working on SQL extensions and ANTLR grammar not too long ago. This was during my StreamCruncher days.

Looking back, the effort it required to write a clear, unambiguous grammar, simplify the AST, validate it and then finally produce some executable code was considerable. I distinctly recall that it was not very enjoyable.

Nowadays, the Fluent interface is gaining popularity, especially for implementing mini-DSLs. Here's a introduction to designing DSLs in Java.

For clarity, I'll refer to the custom language/grammar as Stringified-DSL and the Fluent interface as Fluent-DSL.

Now, don't get me wrong I'm all for designing a concise and crisp "language" for specific tasks. That's the whole reason I built my program on SQL extensions (i.e Stringified). But does it always have to be a dumb string - something in quotes, which cannot be refactored, no IDE support like auto-complete, the hassle of providing a custom editor for it...?

It doesn't just end there. When you expose a custom Stringified-DSL you and your users are stuck with it for a very long time. If you want to expose any data structures which your users will eventually ask for - after playing with the simple stringified demos, then you will have to ship the new grammar to them, enhance your custom editor, document them clearly. Also, if the underlying language - let's consider Java suddenly decides to unleash a whole bunch of new awesome APIs, syntax and other language enhancements; which is exactly what happened with Java 5 if you recall - Annotations, Concurrency libraries, Generics. You see what your users would be missing because you chose the Stringified route?

In case you are wondering why your users would want to use Concurrency, data structures and such stuff - we are talking about Technical users and not Business users. Business users will be happy to play with Excel or a sexy UI, so we are not talking about their requirements.

So, what are the benefits of exposing a Fluent-DSL?

  1. Easy to introduce even in a minor version of the product. Quick turn around because it involves mostly APIs

  2. Documentation is easy because it can go as part of the JavaDocs

  3. Easy for the users to consume and use. Lower setup costs and only a mild learning curve

  4. IDE features - Refactoring, Type safety, no custom editor required

  5. Base language features - Users don't have to shoehorn their code into a clumsy custom editor, code can be unconstrained and they are free to use data structures, Annotations, Generics, Concurrency, Fork-Join and all the other programming language goodies. Which is very good in the long run because you don't have to paint yourself into a corner with all that custom stuff

So, what tools does our mild-mannered Developer have at his/her disposal? Some nice Open Source libraries I've spent time about reading come to mind:
To be fair to Stringified-DSLs, there are some nice libraries that would make life easier for everyone involved in case you badly have to go the stringy route.
  • MVEL - my favorite. The Interceptor feature is a nice touch

  • JSF Expression Languages like - JUEL and SpEL

  • MPS from the IntelliJ guys. I haven't understood this fully. It seems to be somewhat like MS Oslo, maybe less. But Oslo itself is hard to understand because of the poor arrangement of their docs

  • DSLs in Groovy - although I'm not very convinced since Groovy itself has a learning curve

If you are a .Net user, how I envy your LINQ extensions. Man that was a brilliant move. I wonder how they managed to get their parsers and editors to work so seamlessly with the regular C# code.

Cheers!

Monday, August 04, 2008

StreamCruncher source on Google Code

I uploaded StreamCruncher version 2.3 source code to Google Code a few weeks ago. If anyone is interested in maintaining it, you are most welcome.

Wednesday, October 10, 2007

Expression Languages (OGNL and MVEL)

For those of you using Expression Languages in your programs to add that bit of pluggable logic fragments, I'm sure you've evaluated or probably even use OGNL. StreamCruncher uses OGNL 2.7+ to handle some of the messier parts of Expression evaluation and I've found it to be a huge time saver. What's even better is that the 2.7 version also converts the Expressions into dynamically generated Bytecode.

So, if you want to evaluate OGNL, here is a list of useful links. They are not easily locatable, so I thought this would also be a good place for me to bookmark them for future use.

Here's the old link - OGNL
The 2.7+ versions are handled by this guy Jesse - his Blog
The latest releases - on OpenSymphony

I also strongly suggest evaluating another very well done Expression Language - MVEL,which is written by Mike - his Blog

Saturday, October 06, 2007

StreamCruncher 2.2 Release Candidate

The 2.2 Release Candidate is now available. This has some important performance related changes over the 2.2 Beta version. Over the past few releases, I've spent a considerable amount of time working on parts of the Kernel to perform end-to-end processing without having to go the Database. This version performs Correlation Query processing and single Stream Query processing entirely in Memory. As a result, there are some things that don't work - like the "Order by" and "Group by" clauses. You might call it laziness, but there's only so much time I can spend on this, what with a day job and all.

Anyway, the performance has shot up to very respectable figures. The CorrelationPerfTest that I spoke about in my previous blog can now process a total of 168,000 Events per second on a single Processor, dual Core 1.8 GHz Centrino with 2 GB Memory. The Test has 3 Correlation Queries. Two of them correlate 3 Streams each and one Query correlates 2 Streams.

It's been a long journey. I'm so glad that SC can do this many Events per second now. I remember being quite worried a year and a half ago, when it could not do more than a few hundred events per second.

Monday, August 20, 2007

StreamCruncher 2.2 Beta

Over the past few weeks I've been meddling with the Correlation engine inside StreamCruncher. The 2.2 Release is a result of that ongoing work. "Ongoing" - thus the Beta status. Nevertheless, this release includes changes to the Correlation code (alert..using..when.. clause), with rather drastic changes, in that the Kernel no longer requires the use of the Database to perform Pattern matching! Thereby, increasing performance and decreasing latency several fold.

A new TestCase has been added - CorrelationPerfTest, that demonstrates this wonderful improvement. In this TestCase, there are 4 Streams of Events that are monitored by 3 different Queries, each looking for a distinct Pattern. Since each Query runs in its own group of Threads, the Test scales well on Multi-Core/CPU systems. This Test also generates and consumes large amounts of data and thus serves as a Stress Test too.

Since this is an interim release, there are a few features that are still being developed - like the "case...when.." clause that used to work in previous releases. Now that the Database is no longer being used for Pattern matching, those features that were freely available in the Database are yet to be replicated by the Kernel.

Wednesday, July 18, 2007

StreamCruncher 2.1 Release Candidate

The follow up to the 2.0 Beta release is now ready. This release has some minor bug-fixes and feature enhancements:

1) Pre-Filter for Input Event Streams support <, >, !=, =, *, /, +, -, in (..), not in (..), and, or. The in clause can refer to an SQL Sub-Query. Such Sub-Queries are cached by the Kernel to improve performance

2) An additional property cacherefresh.threads.num can be configured to specify the number of Sub-Query Cache processing Threads to use

3) 2 new Test cases have been added to test the new features - H2StartupShutdown3Test and ThreeEventOrderTest

Wednesday, July 04, 2007

StreamCruncher 2.0 Beta is ready!

This 2.0 version is the result of a major refactoring job.

1) The API has been greatly simplified. The internal architecture has changed considerably, resulting in a vast improvement in performance. The TimeWindowFPerfTest (single Query on a Stream) can do 25,500 Events per second on a 1.6 GHz Centrino! More details and log files in my next Blog

2) Plain Windows constructs that were available in previous version has been removed entirely. Partitions are the only construct now. The syntax for Simple (anonymous) Window Partitions has changed slightly, in that there is no by keyword between partition and store

3) An additional property db.schema can be specified, as the Database Schema in which the Kernel creates its internal artifacts

4) Chained Partitions from now on, must always have a Pre-Filter clause starting with $row_status is new/dead

5) The Kernel can now accommodate Events that arrive out-of-order. OutOfOrderEventTest demonstrates this new ability

6) Pre-Filter temporarily does not support the complete SQL grammar like in, exists

Phew! It took me more than a month for make these changes. The internal Event stores for Input Event Streams and Partitions have changed considerably. The idea was to keep the Events inside the Kernel's process for as long as possible and delay inserting the Events into the Database to the last stages. As a result, Events get passed around as references most of the time. The latency per Event has dropped dramatically.

There are a few things that need to be completed/cleaned up - like the Pre-Filter clause syntax, Kernel restarts do not work correctly. This version comes with the latest version of the H2 Database, which now supports Table-level concurrency - a much needed feature for StreamCruncher.

Saturday, June 16, 2007

Coming soon...a leaner and meaner StreamCruncher 2.0. With rather major changes to the internal architecture and API. Some preliminary tests on the TimeWindowFPerfTest revealed a 75% increase in performance!! Previous results.

Tuesday, May 29, 2007

StreamCruncher 1.14 is available! This release comes with a new feature/syntax - the self# clause, to perform efficient Self-Joins over Streams. Self-Joins are useful when the Events in a Window have to be scanned/matched against Events from the same or other Windows defined as part of the same Partition clause.

The StockPriceComparisonTest TestCase demonstrates the use of this new syntax. More details in the "StreamCruncher Basics" - documentation.

Sunday, April 22, 2007

StreamCruncher 1.13 Release Candidate is ready!

1) This version includes support for Oracle 10g and has been tested on Oracle Enterprise 10.2.0.

10g being an Enterprise grade Database, requires Tuning by a DB Expert before you start using it as the underlying Database for the StreamCruncher. I don't claim to be an Oracle expert and so I'd ask my DBA to setup the Database for very low Latency, deferred Disk flush, Larger Page and Cache sizes - so that he/she will translate that into the necessary Oracle Configuration changes. People have been creating TableSpaces on RAM Drives mostly to host Indexes for Tables that are constantly modified and heavily contended. I'd also think of creating the whole Database on such a RAM Drive.

StreamCruncher also creates Tables and Indexes for internal purposes. You'll have to ensure that the Schema in which these get created (usually the User name provided in the StreamCruncher DB Config file) are on the TableSpace that is Tuned & Configured for this purpose.

Another good thing to remember to tell the DBA would be the nature in which Events/Rows are operated upon in the Database via StreamCruncher. In any Internal Table, Events are mostly pumped by one Thread and consumed by another Thread - very similar to a Queue or a Conveyor Belt. Updates are done on an Indexed Column mostly on a small set of Rows that are usually in the Page Cache. All DB access (Insert/Update/Delete) by StreamCruncher on its Internal Tables are through Indexes - some Unique and some are not.

Remember, Oracle Database Tuning is an Industry in itself. Make sure you've tuned your setup well!

2) There was another small Concurrency issue in the Kernel that has also been fixed. The last of these kind of issues, hopefully. So, I've finally got rid of the "Unique Index Violation" errors I used to get only on Multi-processor machines. Version 1.12 had it fixed for Single Processor machines. I also have to admit that this fix affects the performance on single Processor machines too, though an increase of only by about 10-13%.

Tuesday, April 17, 2007

Event Processing in 2007 and beyond

An Essay, by Ashwin Jayaprakash (Apr 2007)
Website:
http://www.streamcruncher.com

A sure sign of a technology becoming mature and widely accepted is when developers start losing interest in it and crave for something more. How many of us still take pride in designing and writing applications that use Stateless Session Beans, Message Driven Beans and ActionForms anymore? Let's face it: the tools have matured, we have Blueprints and Design Patterns that guide us at every step of a project's phase. All the (remaining) vendors now offer more or less similar sets of features. Clustering, WebService stacks, and basic tooling are givens now.
We have grudgingly moved towards some sort of interoperability. Some of us feel that XML is overused and misused, but still love XQuery and XPath. SOAP took its own time to evolve from Draft specifications. When it did, most of us realized that it was too raw and bloated to do anything really useful. So, we had to wait for all the sister specifications like Addressing, Transaction, Delivery to reach a decent level of sophistication - or switched to service APIs that don't have formal specifications, like REST or XML-RPC. In the meantime, there were other specifications fighting for relevance - BPEL, Workflow and others to handle Orchestration... that relied on WebServices to do the dirty work. And when we were not looking somebody slid in SOA and ESB.

Where are we really headed?

What will we poor Developers have to fiddle around with over the next few years? After years of manoeuvering and fighting, we have specifications for almost everything, finally and enough for the CTO and his/her team of Architects in companies where Technology is not a core business, to think at a higher, abstract level and treat IT Projects merely as "enablers". If we filter out all the fluff from the ESB, SOA, WS, WSA and other technologies that hide behind catchy acronyms, we have to acknowledge that Software is slowly moving towards commoditization. Electronics/Hardware industry achieved that several decades ago. It was probably easier for them to do it because they left all the hard part of "Programmable logic and customization" to the Software industry.
We have 2 widely used programming languages. XML, XPath and XQuery to transform and massage data from one format to another. WebServices and its related protocols and formats to send and receive data which usually happens to be XML payload, but mostly within the Company's Intranet. BPEL to handle more complex workflows, some of which extend beyond the Intranet. ESB provides the platform to streamline all the different formats and protocols, that we use both internally and externally. So on and so forth. Essentially, we finally have things which finally resemble "Software Components". The situation sounds similar to the Motorcar revolution we've all heard of, which happened in the beginning of the previous century. The Internal combustion engine became more efficient (think of Java/JEE and .Net), Coach builders started making Car bodies, accessories and trimmings (think Packaging - XML and related technologies), good roads and extensive Highway networks got built (think WebServices, ESB etc) and then gradually Traffic cops got replaced by complex Traffic lights and other control networks (think BPEL and competing Workflow/Orchestration standards).
What we will see in the future will be more intelligent IT Systems, and if you want to carry the Motorcar analogy even further - something like Cars that can drive themselves and a "Big-Brother" like network that co-ordinates all traffic. We in the Software industry are definitely heading that way. RFID is finally, commercially viable. RFID and the technologies that it will foster will lead to more prevalent and intelligent Warehouses, smarter Assembly lines, better integration between Traffic control, Accident avoidance and GPS Navigation systems. A host of systems that can automatically respond to feedback and act quickly. Automatic Trading is one among them. One of those technologies crucial to such development is Event Processing. This is a broad category of complimentary technologies - Business Activity Monotoring, Event Stream Processing, Complex Event Processing, Event Driven Architecture. Some of these might appear to be combinations of the others. To keep it simple, we'll label them as Event Processing technologies.
Some of these technologies have been around for several decades in one form or another. Control Systems have custom, in-built, realtime, hardwired Event Processing. Some companies have been using Rule Engines either directly as Inference Engines or just to respond to Facts (as Events). Simple Database Triggers and Stored Procedures are still popular. In short, Event Processing in its new avatar is going to become more widely used in the future. We have also been observing many Event Processing Products and Projects stepping out of their niches by offering general purpose stacks that can be integrated into almost any kind of application.

The tenets of Event Processing

A facility to consume the state(s) of a System as discrete Events, a language or User Interface to specify the conditions/properties to look for in those Events from one System or across Systems (Co-relation) and provide actions to be performed when such situations arise.
It is no accident that Event Processing looks like a cross between Database Query processing and Rule Engines. There is no denying that the technology evolved from existing technologies. However the distinguishing characteristics of Event Processing technology are - the constructs it provides to specify Time based boundaries, fixed size moving Windows, Aggregation and Partitioning/Grouping Events - on a stream of incoming Events and a facility to continously scan these Events for telltale signs, all this expressed as a concise Query.
Database Triggers which in turn execute SQL/Stored Procedures everytime an Event/Row is inserted can provide rudimentary Event Processing functionality. Rule Engines can be used to discern information from Facts being asserted. But none of these technologies alone provide a concise and yet very expressive Domain Specific Language along with a platform, specifically built to constantly monitor/process live streams of Events.
There are just a handful of Event Processing Stacks today. Some are very high end, some very high end but address a specific vertical like Automatic Trading, some general purpose stacks meant for every one and also a few that have been re-packaged and/or re-labelled and shoved into the fray.
Each Stack is implemented in a different way, which shows in the way the Query constructs have different syntax, but the semantics are similar.
Let's look at how one such Event Processor can be used to solve those real-world problems that have been nagging us. No, I don't mean Global Warming, Poverty or the AIDS epidemic. We'll leave that to SOA and ESB.
StreamCruncher is an Event Processor written in Java. It uses a Database to maintain the Events and other artifacts. Since latency and performance are crucial to Event Processing, Embedded and In-Memory Databases are preferred. It exposes the SQL language and its capabilities that we have come to rely so much on, by extending it with an "Event Processing Query Language". The Event Processing Kernel is multi-threaded, and processes Events (and Queries) in stages, to leverage the power of Multi-Core Processors. The Database does some of the heavy lifting by doing the parts it does best like Joining, Sorting etc.
Imagine that there is a small community of farmers who grow Organic Oat, Wheat, Corn and other stuff. They sell their produce and a few other products like Premium Breakfast Cereal, Peanut Butter, Free-range Eggs etc as a Co-operative. To reach a wider customer base, they opened a small Website 2 years ago to cater to online orders. Recently, when an international TV Station featured this Community's products on their Health show, their online sales sky rocketed and almost crashed and burned their Servers. So, they are compelled to upgrade their Website and add a few features to it to ease the burden on their overworked, small IT team.

Online Retail Store

This is what they want for their Online Store: They have quite a few Customers like Spas and Department store chains around the nation, which have arrangements with the Farm, where they place regular, automatic, bulk Orders. These Customers are treated as Priority Customers and they have Service Level Agreements to fulfill the Orders (Verify Payment, alert the Warehouse and Shipment team etc) within 30 minutes of placing the Order. If 30 minutes pass without an Order being "Fulfilled", then an alert is raised for further investigation.
We create 2 Streams to receive Events - one for Orders placed and another for Order Fulfillments. They receive a constant stream of Events, which have to be monitored for SLA failures. It can be accomplished by a simple Event Processing Query in StreamCruncher, by using the Window constructs and the "Alert.. Using..When.." Correlation clause. This is a classic Co-relation example.
To fully understand all the features and syntax of the Query language and the procedure to setup Input and Output Event Streams and other operations, please consult the Documentation. Here's a skinny on the syntax: A Sliding Window is defined like this "store last N", a Time based Window like this "store last N milliseconds/seconds/minutes/hours/days" and a Tumbling Window like this "store latest N". The Stream can be divided into small Partitions with Windows defined at specific levels like this "partition by country, state, city store last N seconds". This creates Time based Windows at the "Country > State > City" levels.
SLA Failure Alert:
select unful_order_id, unful_cust_id
from
alert
order_event.order_id as unful_order_id,
order_event.customer_id as unful_cust_id
using
cust_order (partition by store last 30 minutes
          where customer_id in
          (select customer_id from priority_customer))
         as order_event correlate on order_id,
fulfillment (partition by store last 30 minutes
           where customer_id in
           (select customer_id from priority_customer))
         as fulfillment_event correlate on order_id
when
present(order_event and not fulfillment_event);
Let's analyze the Query. There are 2 Tables cust_order and fulfillment, into which the 2 Event streams flow. The Query looks like an SQL Join Query, although this syntax is more expressive, yet concise. The Alert clause is specifically for performing Event Correlation across multiple Streams. Other scenarios described further ahead use syntax that are only minor extensions of SQL, unlike the Alert clause. The Query keeps running on the 2 Streams, monitoring the 2 Streams and generating Output Events as SLA Failure Alerts. The "Alert..Using.." clause consumes Events from the 2 Streams. The "correlate on" clause is like the "join on" clause in SQL which specifies the column/Event property to use to match Events from different Streams. The "When.." clause specifies the Pattern to look for. This Query is meant to output an Event when the Customer Order Event expires without receiving a corresponding Fulfillment Order within 30 minutes since the Order was placed, thus producing an SLA Breach Alert. Such Queries are also called Pattern Matching Queries as they are configured to look for the presence or absence of specific combinations of Events.
Since the Farm has these contracts with their corporate consumers, they have to maintain a minimum number of items in stock in their Warehouses distributed around the nation, to avoid an SLA breach.
We have to setup a mechanism which constantly monitors the items in stock in the Warehouses. When an Order is placed, the products and their quantities are deducted from the Warehouse's stock. Products are supplied from different Warehouses based on the location closest to the Ship-to address. Warehouses are also stocked periodically and the products and quantities are added to the Warehouse's stock. So, we receive 2 kinds of Events - Orders, which reduce the Stock and so carry a negative number for quantity and Re-stocks, which increase the Stock.
We also use a regular Database Table, to store the minimum stock that must be maintained for each product. When the stock for that product goes below the prescribed quantity an alert should be raised.
Warehouse Re-Stock Alert:
select country, state, city, item_sku, sum_item_qty,
 stock_min_level
from warehouse (partition by country, state, city, item_sku
   store lastest 500
   with pinned sum(item_qty) entrance only as sum_item_qty)
   as stock_level_events,
   stock_level_master
where stock_level_events.$row_status is new
    and stock_level_events.item_sku =
    stock_level_master.stock_item_sku
    and stock_level_events.sum_item_qty <
    stock_level_master.stock_min_level;
 
The "partition by country, state, city, item_sku" means that separate Windows are maintained based on the Event's Country, State, City and Product. An Aggregate function - "sum(item_qty)" is used to maintain the sum of the quantities of the products in the Window.
The Window is a Tumbling Window, where Events are consumed in one cycle and discarded in the next cycle. Their lifespan is just one cycle. Since the aggregate here, is the Sum of the "item_qty" of all the Events currently in the Window, it changes even when Events expire. But we don't want the Sum to change when an Event expires and we just want to keep the sum of all the items passing through the Window. The "entrance only" clause in the Aggregate function informs the Kernel to calcualte the aggregate when an Event enters the Window and to ignore it when the Event is expelled from the Window. Since the Orders and re-stock quantities are negative and positive numbers, the aggregate will automatically store the total quantity in the Warehouse.
Whenever the Sum goes below the minimum value specified in the "stock_level_master" Table, an alert is raised.
The "pinned" clause is used to hold/pin up all the Aggregates even if all Events in the Window expire. Otherwise, for "Time based windows" and "Tumbling Windows", the Aggregate if declared, expires along with the Window when all Events in that Window expire. It gets re-created when new Events arrive.
The ability to merge an Event Stream with a regular Table reduces the maintenance required to maintain the Master data of minimum values, adding/removing Products etc because it is a regular DB Table. And the Query's behaviour is driven by the Master Table (stock_level_master).
Orders are placed from all over the nation. To reduce the overhead of shipping smaller orders, the process is streamlined by aggregating orders being shipped to the same City and are then taken care of by local distributers. This way, bulk-shipment offers can be availed by the Farm.
Shipment directives are held for 30 minutes. If more than 5 such directives are received within those 30 minutes, to the same City, they are dispatched in batches of 5 even before the 30 minutes expire. Thus saving the overhead of shipping smaller orders individually.
Shipment Aggregator:
select country, state, city, item_sku, item_qty,
 order_time, order_id
from cust_order
(partition by country, state, city
store last 30 minutes max 5)
as order_events
where order_events.$row_status is dead;
This Query produces Output Events when an Event gets expelled from the Window, as indicated by the "$row_status is dead" clause after it has stayed in the Window for 30 minutes or expelled before that as part of an "Aggregated Order" when the Window fills up with 5 Order Events.
The Website itself has to be spruced up with some nifty features. To draw the attention of Online visitors, the top 5, best selling items over the past 30 days are to be displayed. This is accomplished by a Query that uses a Chained Partition clause, where the results from one Partition are re-directed to another Partition, which slices and dices the intermediate data into the final form.
Best Selling Items:
select order_country, order_state, order_category,
 order_item_sku, order_total_qty
from cust_order
  (partition by order_country, order_state, order_category,
   order_item_sku store last 30 days
   with sum(order_quantity) as order_total_qty)
to
  (partition by order_country, order_state, order_category
   store highest 5 using order_total_qty
   with update group order_country, order_state, order_category,
   order_item_sku where $row_status is new) as order_events
where order_events.$row_status is not dead;
This Query uses a "Chained Partition", where 2 different Partition clauses are chained be means of the the "to" keyword and the results from the first Partition are fed to the second Partition, whose results form the input to the Query. Such chaining is required when a single Partition cannot accomplish the task. The first Partition stores a rolling sum of the total Quantity sold at the "Country > State > Category > SKU" level and those Sums are sent to the second Partition which uses the "..is new" clause to pick up only the newly calculated/changed Sums. The second Partition uses a "Highest Rows/Events" window, which is a slight variation of the Sliding Window to maintain an up-to-date list of the "top 5" selling items in each "Country > State > Category".
As can be seen from the examples above, Event Processing is quite versatile and can be used to implement many Use Cases concisely and efficiently. All Event Processors are built to handle good data rates, which greatly simplifies the development and maintenance cost of the applications using them.

Links

Friday, April 13, 2007

Finally!! StreamCruncher 1.12 is ready and it's no longer a Beta version. This is the Release Candidate.

(Also, performance test results - read further. Hint: 8,000 TPS !! on 1.6 GHz Laptop)

I found the time to review some parts of the Kernel code. It turned out that there were small things here and there that needed fixing. Since the Kernel is heavily multi-threaded, it was important that locking be reduced. As a result, the CAS (Compare and Set) operations (Java 1.5+) are used in many places. This is much faster than actually waiting on a lock and then realising that the logic in the protected section does not have to be executed anyway.

After fixing these issues, I modified the "TimeWindowFPerfTest" class to capture more metrics. Apart from just calculating the Average Latency added to each Event by the Kernel in a Straight/Simple processing case, this Test now calculates the average total time it takes to insert rows into the Database and for the Kernel to publish them.

The Test was already described before. This time, with the bug fixes, there were no Index-violation exceptions. So, on my Laptop running Windows XP Home with 1 GB RAM and a single 1.6Ghz Intel Centrino Processor, I ran the "TimeWindowFPerfTest" performance test using the Sun JDK 1.6 and StreamCruncher 1.12 with H2 Database.

I redirected the verbose Console output to a log file and thereby eliminated the otherwise excessive overhead added by the Console logging. This way, I also have proof of all the Tests that were performed.

The Test uses a Thread to generate and pump 'X' events in one shot without pausing. A Query with Time based Partition is defined on this Stream. The Window size is 5 seconds. A "$row_status is new" clause is used to output only the new Events that arrive at the Window and not the ones that exit the Window when their 5 seconds are over. This way, an accurate measurement of how much overhead the Kernel is imposing can be calculated. The total time taken for the entire batch to be inserted and for it to be pumped out of the Kernel can also be calculated. This can then be used to calculate the Transactions per second - the most important metric.

The Test pumps these 'X' events and then waits for some time that is sufficient for the Events to clear the area and then pumps the same number again...and again..At the end of the Test, the results are retrieved, verified and then the Averages are calculated.

Ok, here it comes..Keep in mind that this is a single CPU and the Event "pumper" and the Kernel are running in parallel. The H2 Database (current version) is completely single-Threaded - and so there's no concurrency at all, even though StreamCruncher supports concurrent operations.

I ran 3 rounds for each configuration and here are the results:

Set 1 (4000 Events per Batch):

Set 1 - Round 1
Total events published: 36000. Each batch was of size:4000. Avg time to publish each event (Latency in Msecs): 224.0
Avg time (in Msecs) to insert 4000 Events into the DB: 418.0
Avg time (in Msecs) to process 4000 Events by the Kernel: 376.0
Avg time (in Msecs) for the insertion of first Event in the batch of 4000 Events into DB to publication of last Event in batch by Kernel: 598.0

Set 1 - Round 2
Total events published: 36000. Each batch was of size:4000. Avg time to publish each event (Latency in Msecs): 199.0
Avg time (in Msecs) to insert 4000 Events into the DB: 428.0
Avg time (in Msecs) to process 4000 Events by the Kernel: 397.0
Avg time (in Msecs) for the insertion of first Event in the batch of 4000 Events into DB to publication of last Event in batch by Kernel: 600.0

Set 1 - Round 3
Total events published: 36000. Each batch was of size:4000. Avg time to publish each event (Latency in Msecs): 261.0
Avg time (in Msecs) to insert 4000 Events into the DB: 387.0
Avg time (in Msecs) to process 4000 Events by the Kernel: 336.0
Avg time (in Msecs) for the insertion of first Event in the batch of 4000 Events into DB to publication of last Event in batch by Kernel: 591.0

Set 2 (8000 Events per Batch):

Set 2 - Round 1
Total events published: 64000. Each batch was of size:8000. Avg time to publish each event (Latency in Msecs): 378.0
Avg time (in Msecs) to insert 8000 Events into the DB: 603.0
Avg time (in Msecs) to process 8000 Events by the Kernel: 699.0
Avg time (in Msecs) for the insertion of first Event in the batch of 8000 Events into DB to publication of last Event in batch by Kernel: 1044.0

Set 2 - Round 2
Total events published: 64000. Each batch was of size:8000. Avg time to publish each event (Latency in Msecs): 457.0
Avg time (in Msecs) to insert 8000 Events into the DB: 533.0
Avg time (in Msecs) to process 8000 Events by the Kernel: 666.0
Avg time (in Msecs) for the insertion of first Event in the batch of 8000 Events into DB to publication of last Event in batch by Kernel: 1013.0

Set 2 - Round 3
Total events published: 64000. Each batch was of size:8000. Avg time to publish each event (Latency in Msecs): 392.0
Avg time (in Msecs) to insert 8000 Events into the DB: 593.0
Avg time (in Msecs) to process 8000 Events by the Kernel: 839.0
Avg time (in Msecs) for the insertion of first Event in the batch of 8000 Events into DB to publication of last Event in batch by Kernel: 1064.0

Set 3 (10,000 Events per Batch):

Set 3 - Round 1
Total events published: 70000. Each batch was of size:10000. Avg time to publish each event (Latency in Msecs): 491.0
Avg time (in Msecs) to insert 10000 Events into the DB: 705.0
Avg time (in Msecs) to process 10000 Events by the Kernel: 783.0
Avg time (in Msecs) for the insertion of first Event in the batch of 10000 Events into DB to publication of last Event in batch by Kernel: 1220.0

Set 3 - Round 2
Total events published: 70000. Each batch was of size:10000. Avg time to publish each event (Latency in Msecs): 518.0
Avg time (in Msecs) to insert 10000 Events into the DB: 689.0
Avg time (in Msecs) to process 10000 Events by the Kernel: 845.0
Avg time (in Msecs) for the insertion of first Event in the batch of 10000 Events into DB to publication of last Event in batch by Kernel: 1198.0

Set 3 - Round 3
Total events published: 70000. Each batch was of size:10000. Avg time to publish each event (Latency in Msecs): 513.0
Avg time (in Msecs) to insert 10000 Events into the DB: 647.0
Avg time (in Msecs) to process 10000 Events by the Kernel: 743.0
Avg time (in Msecs) for the insertion of first Event in the batch of 10000 Events into DB to publication of last Event in batch by Kernel: 1151.0

While the Tests were running, I kept noticing that the CPU even for the 10K Set was not rising above ~15% and that for just 1 second periods. Which is quite puzzling. It might be because the Producer and the Consumer Threads are not really running in parallel because the one common resource - the Database is always locked by one of these 2 (sets of) Threads. I was expecting the CPU to peak and the Tests to crumble at the 10K Set. But it didn't, which is a very good sign.

This means that StreamCruncher can do 8000 Transactions Per Second (Straight/Simple Processing) on a very ordinary setup and perform exponentially better on better hardware (more Cores and/or CPUs) and commercial Databases. This, combined with Horizontal Partitioning of the Stream data (using the Pre-Filters and multiple Queries to split the Events and process in parallel) should produce fantastic performance.

The test results/logs can be downloaded from here.

Monday, April 09, 2007

StreamCruncher 1.11 Beta is available. No changes to the code though. I had forgotten to update the Syntax diagram in 1.10 Beta which included the "$diff" and other custom Provider changes.

Saturday, April 07, 2007

StreamCruncher 1.10 Beta is ready! This release includes:

1) $diff clause for In-built Aggregate Functions along with custom Baseline Provider feature. ClusterHealthTest demonstrates this new feature.

2) Custom Window size Provider should now be declared in the Query, statically. TimeWFPartitionWinSizeProviderTest demonstrates this change.

3) A simple class StandAloneDemo shows how to run a sample using just the Java main(..) method. sc_run_standalone.bat can be used to run the Demo

Monday, March 26, 2007

Here's another reason why using an RDBMS in StreamCruncher was not a bad design decision at all. RDBMSes (what's the plural of RDBMS?) have attained a status (in Middleware) that is unequalled by any other technology. We trust RDBMS to store our Bank balances, a nation's entire Census data, Criminal records and the list goes on and on. So, when such a technology is always within easy reach, common sense tells us that it should not just be used, but embraced...Ok, enough of that and moving to the point ...

Many people who are familiar with ESP and CEP, when they learn that StreamCruncher uses an RDBMS underneath, probably wince at the mere mention of a Database. To them Databases are good but not good enough for Stream Processing. To many RDBMS is the anti-thesis of Stream Processing. Why? They begin to express vehemently about the "presumed" drawbacks of such an architecture. They start lecturing about Performance. Speed. "Sub-millisecond latency cannot not be achieved in regular RDBMS".. and so on.

Well, they're not entirely true. Even if you ignore the fact that StreamCruncher already supports several Embedded, In-Memory, Real-time Databases that are routinely used in Telecom (the DBs), there is a classic solution for regular RDBMS that will make them as good as any In-Memory Database. The biggest hurdle is the Hard disk - Persistence. Databases are meant to store data for posterity. But storing data to the Disk also adds a lot of latency. All other things offered by the Database like concurrency, scalability etc are very much required for Event Processing. Persistence is not required for Event Stream Processing. So, a regular Enterprise class Database can still be used as the base for StreamCruncher, which provides a solid/robust foundation on which to perform ESP/CEP - all this by creating the Database Tablespace on what is called a RAMDisk. Which is just a soft-drive, where a portion of the Physical Memory/RAM is turned into a Storage Drive (like C:\ or /usr/home/myname). This acts as any other Drive where files can be created etc but everything gets wiped out when the Machine is rebooted. This technique is not something new, but fits perfectly well in the context of StreamCruncher. RAMDrives can be created for almost any Operating System. A simple search on the Internet reveals several products and techniques for creating RAM Drives.

So, all the licensing costs that a Company has incurred on acquiring and maintaining these "Big Daddy" Databases can still be leveraged for Event Stream Processing. In the end, StreamCruncher gets to use a time tested Database that Developers have been using in their other regular Projects and can rely on the stability of such DBs. The important Sorting, Pre-Filtering, Joining and other CPU-intensive operations happen in the Database Engine, which is in Native code. And of course the Developers' familiarity with such Databases also plays a crucial role in adoption and integration with other parts of the project.

Sunday, March 18, 2007

StreamCruncher 1.09 Beta is ready and it includes a major feature addition. From version 1.09 onwards, Multi-Stream Correlation a.k.a Pattern matching is possible. This feature enables the monitoring of and correlation across multiple Streams of Events using a simple SQL-like Query.

Although simple 2 Stream Correlation is possible using regular SQL plus Partitions, as demonstrated in the SLAAlertTest, watching for multiple Patterns across more than 2 Streams is now very easy with this new "alert..using..when.." clause.

MultiStreamEventGeneratorChainTest demonstrates this new feature.

Thursday, March 15, 2007

People have asked me how StreamCruncher performs. Until today I did not have a clear answer because I did not have access to a good Server/PC. Last week I managed to borrow (just for a few minutes) a high end Intel/Window2K Server - 4 CPU, 3.6 GHz each with a total of 3 GB RAM.

Out of curiosity, I ran the streamcruncher.test.func.h2.H2TimeWindowFPerfTest test with all the default configurations to JVM, StreamCruncher 1.08 Beta, H2 Database etc, Parallel GC but with just 1 Collector Thread etc.

The test pumps 250 events in one burst, pauses for 6 seconds and repeats this for 4 cycles. So, it does 4 iterations of 250 events each. The test is a simple Straight Through Processing. A simple Time Based Anonymous Partition, which holds Events for 5 seconds. The Test measures the average time it takes for an Event to get expelled from the Kernel (publish by Kernel) since the time it was created. This does not include the 5 seconds it stays in the Window. It measures the Overhead added by the Kernel processing.

Keep in mind that StreamCruncher is still in Beta. There are still a few rough edges to be taken care of - StreamCruncher has neither been Profiled nor have load & performance tests ever been run.

On my 1.6GHz Win XP Laptop with 1 Gig RAM and JDK 1.6, the average Latency per Event for this test was a disappointing 380 Milliseconds. I was quite sure that the single CPU was bogging down the performance, because the Test creates and inserts live, randomly generated Data and the Kernel which is heavily multi-threaded also shares the same CPU. Performance was below expectations. Since all these operations were in memory, there was practically no opportunity for the CPU to switch between Threads while another Thread was blocking on the Network or Disk. So, there was practically no concurrency.

But when I ran the same test on the 4 CPU Box, the average CPU utilization did not even change noticeably while spitting out Events at a fantastic 7-9 Millisecond latency. I was blown away by the numbers. Preliminary though it may be, the multi-threading change in Release 1.05 must've really paid off. Another important thing was that the Tests were printing verbose output to the Console, which slows down the whole application. So, with Zero logging or logging to a file, latency might've improved even further. However, there was an ugly bug that reared its head when I re-directed Output to the File. I kept getting a "Unique Constraint Violated" error quite often. This has to be fixed soon. Since H2 Database was used in these tests, which in its current version does not support Multi-threaded access I'm hoping performance of StreamCruncher will be exceptional on other In-Memory Databases like Oracle TimesTen.

Sometime over the next few months I'll conduct proper tests on a stabler version of StreamCruncher and publish my findings.

Wednesday, February 28, 2007

The JDBC Engineers at ANTs have been busy fixing the bugs (a & b) in their Driver. Today, I received an email from them saying:

We have been tracking your issue as case 1206. As per your statements, we had logged the following bugs, which have been resolved, fixed & verified.

Bug 1415: JDBC getParameterTypeName() returns unknown for Date Datatype.

Bug 1416: JDBC getParameterClassName() method returns hard-coded string as "java.sql.ParameterMetaData"

Bug 1596: ANTs JDBC driver doesn't get registered

Bug 1597: ANTs JDBC Issue - NULL is inserted when Long.MIN_VALUE is inserted into bigint column
Cool! So, we should be seeing these changes in their next release.