Category Archives: AMQP

Map/Reduce, Twitter, RabbitMQ & CEP – Let's Wrap It Up

Ok, so if you’ve been following along, you’ll already know:

  1. Installed RabbitMQ, and
  2. Written a Twitter OnRamp that subscribes to a sample Twitter feed and publishes it on RabbitMQ, and
  3. Written a RuleBot that takes the tweet and breaks it into chunks, putting the chunks back onto the bus.

And now I’m going to show the Esper code that gives us the top URL’s in the last X seconds found in the Twitter feed.  Here’s a screenshot.

.

Here’s some output in text mode (easier to read):

Event received: {URL=http:\\cnn.com, Total=731}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=643}
Event received: {URL=http:\\rabbitmqm.com, Total=636}
Event received: {URL=http:\\cnn.com, Total=738}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=651}
Event received: {URL=http:\\rabbitmqm.com, Total=642}
Event received: {URL=http:\\cnn.com, Total=742}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=660}
Event received: {URL=http:\\rabbitmqm.com, Total=643}
Event received: {URL=http:\\cnn.com, Total=743}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=660}
Event received: {URL=http:\\rabbitmqm.com, Total=652}
Event received: {URL=http:\\cnn.com, Total=752}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=661}
Event received: {URL=http:\\rabbitmqm.com, Total=654}
Event received: {URL=http:\\cnn.com, Total=759}

Essentially, what’s happening here is that we’re:

  1. Getting demo data (so I know what the relative ratio’s are to get a feel for whether or not this is correct), and
  2. Pumping it into Esper via an URL object, and
  3. Executing a statement (“select URL, sum(count) as Total from TwitterURL.win:time(5 seconds) group by URL”).

This statement gives us the total number of times an URL has been tweeted in the last 5 seconds.  Whenever that URL is tweeted, the sum is calculated, and a listener is called.  This is a very straightforward statement – we could tweak the statement a bit more, or do some stuff in code-like creating a map of URL’s with the current sum (rank).  Or maybe even another stream created.  Either way would expose the information (most popular URL’s) to any interested party.

All that’s left now is to get all of this up and running in the cloud – and that’s what I call real CEP – Cloud Event Processing.  Check in next week.

CEP in the Cloud – with Load Balancing (Part 2)

It’s time to delve a little deeper into what’s proving to be a thorny issue for doing anything in the cloud – especially if you’ve got concerns over once, only once, and in sequence – something that Financial Market’s participants have to deal with every day. We’re not going to get into that level of depth for this post. But I am going to show one way to deploy CEP in the cloud and use a CEP based continuous query to provide load balancing in our classic Map/Reduce (or what has been referred to as a streaming fold) example.

Load Balancing – Partitioning

In our example, we could instruct the RuleBots to only listen to tweets beginning from A-D, E-G, etc. The CEP Service Manager would start up how ever many instances initially required and set up this initial subscription. In the event that the subscriptions needed to change for load balancing requirements, the Load Balancer could stop all of the instances in the Service Pool, re-partition them, and then start them up. All without losing any messages and without processing any message more than once. Even without dealing with in-order (sequence based) processing, one can see how this particular approach leaves a little bit to be desired.

Load Balancing – Selecting a Destination

In the example below, we can see the RuleBots sending resource utilization statistics to the CEP Load Balancing Process.  In this process, the CEP Load Balancing Process listens to all relevant processes comprising a CEP Service Pool and constructs what I call a validity stream – some others call it an ‘update in place’ query.  Essentially, selecting from the stream (“select virtual-node from cep-service-pool where cep-service = “WORDCOUNT” order by resource-utilization desc” ) gives the OnRamp a destination for the current tweet JSON object.  I like this approach a lot.  New processing resources can be added to a CEP service pool dynamically, the CEP load balancer doesn’t really need to know anything about them – the service just publishes their stats and they’re eligible for processing.  Cool.  A picture paints the picture below.

A Picture Is Worth 1,000 Words – Current State of Map/Reduce in CEP Land

Map/Reduce, Twitter, RabbitMQ, and CEP – A More Realistic Example (part 1.5)

Progress Update

I’ve updated all of my RabbitMQ installations to 1.7.1 and started to clear the Eclipse cobwebs out of my head and have found out that I do actually remember enough Java to be dangerous.

Twitter OnRamp

I’ve modified the gist Twitter Client into what I refer to as an OnRamp – we use OnRamps to get messages/event onto the information Super Highway.  In the Twitter OnRamp, I’ve subscribed to the samples.json feed and am publishing “jsonObject.getString(“text”)” to the RabbitMQ bus.  Before we talk about particulars regarding the AMQP specifics, I’d like to briefly touch upon load balancing, especially since we’re going to deploy the example in EC2 when we’re done here.

The Picture on the Whiteboard

Let’s paint a virtual whiteboard picture – on the left side of the board, you’ve got the Twitter OnRamp.  The Twitter OnRamp is publishing tweets onto the bus.  For now, we don’t care about those specifics.

As we move right on the whiteboard, you’ll see a number of CEP instances – all waiting for messages/event to consume so that they can publish value added information back out onto the bus for those interested in receiving it.

Some of those CEP instances will be running the MAP in our example – taking tweets in and tokenizing them – just like in our previous example.  To illustrate, the tweet, “colin clark is writing Java code again much to the chagrin of several engineers” will be tokenized and emitted back onto the bus – it will look like this:

colin, 1

clark, 1

is, 1

writing, 1

Another CEP instance will be listening for those MAP’ed messages and using them as the foundation for a window/aggregate.  This window/aggregate is our REDUCE function.  Our window will give us the most popular words and their relative rank as published via the Twitter sample feed.  Notice that I said “instance” for this function as opposed to “instances” for the MAP function.  Let’s deal with the instances above before answering that question.

Load Balancing In The Cloud

So, our Twitter OnRamp is happy subscribing to the firehose and publishing messages for further processing.  We can safely assume that if one process were tasked with tokenizing and emitting that our processing could very quickly fall behind.  We don’t want that, obviously!

So, as our OnRamp is busy publishing away, is there an obvious way to direct those messages to specific CEP instances on the bus?  Sure, but let’s introduce a 2 rules first:

  1. Messages/Events will be consumed in our system with no regard for their origination, and
  2. Messages/Events will be published with no regard for their either direct or eventual destination.

Hmmm.  These rules would seem to disallow us from tagging messages with a specific CEP instance destination from within the OnRamp.  This leaves us with 3 obvious alternatives to explicitly setting a destination CEP instance:

  1. Use one big honkin’ CEP instance on one big honkin’ machine (the current practice),
  2. Utilize a round robin approach, and softly violate our rules, or
  3. Incorporate some centrally managed directory service that deploys and configures the CEP instances dynamically (hint: I like this one).

For now, we’re going to simulate #3 above, and in the next update, I’ll show you what exactly we’re simulating.  So in the next example, look for a CEP based service which will subscribe to tweets and emit words back out onto the bus for further processing.

Map/Reduce, Twitter, RabbitMQ, and CEP – A More Realistic Example (part 1)

I thought it would make sense to flesh out the previous “Map/Reduce in CEP Land” post, so I did.

We’re going to show a Java Twitter client publishing tweets to a RabbitMQ bus where they’ll be tokenized and emitted (map) for reduction (in a CEP engine).  We’ll construct a stream showing the most popular words over the last hour.

So, first, a Twitter Java client (from gist) – here it is running in Eclipse – we’re subscribing to the sample.json feed:

If you’re going to follow along on this little sojourn, make sure to change the URL you’re subscribing to in the client to:

“http://stream.twitter.com/1/statuses/sample.json”

I call the twitter client an OnRamp – we’ll use this OnRamp to publish messages to the RabbitMQ bus. We’re going to publish the messages without tokenizing them – we’ll do that in the CEP engine. By using the RabbitMQ bus, it will be straightforward to partition the processing.

In the next post, we’ll show the code to get the message onto the bus, and we’ll contemplate how that might be done incorporating some type of load balancing – that’s right, we’ll be hitting multiple CEP engines in this example.

AMQP, RabbitMQ & CEP

One of my readers asked me to take a look at AMQP and RabbitMQ.  Seemed right up my alley, so I did.

AMQP

Since some type of messaging/queuing requirement is becoming fairly commonplace today, we face the same problem we always face when different groups, who have adopted their own solution, want to start exchanging data with other groups, who may have chosen their own solution.  AMQP (you can read all about it here) is a public specification for messaging/queuing, a wonderful idea, has a meaningful and robust spec, and is being used all over the world in a couple of prevalent offerings.  Let’s look at the predominant offering in that space today.

RabbitMQ

I really like RabbitMQ’s tagline on their website, “Messaging that just works.”  The sounds pretty cool.  Why?  Because I’ve not only implemented various messaging platforms but have also designed and engineered one of our own (for very high speed point to point messaging – when Selero was young and we wrote it all ourselves).  And because of that experience, I know that installing, configuring, integrating and reliably running a message platform hasn’t always been that easy.  So I was still grinning a bit when I hit this gem on RabbitMQ’s website:

Downloading and Installing RabbitMQ

Our goal is to streamline the broker installation process such that you can have RabbitMQ up and running within two minutes of completing your download. If this doesn’t happen to you, please let us know at legitimategrievance@rabbitmq.com!

I thought to myself, “Self, they’re really laying this down – so let’s see!”   I downloaded the package which included the examples for .NET; unzipped it to a directory; set my ERLANG_HOME equal to the right directory (more on that later).  Then, expecting to see many error codes and anticipated hours of exchanging email with RabbitMQ’s support department, I double-clicked on the server batch file.

It ran.  Without any problems.  But it was still early to start sending “kudo’s” to the RabbitMQ team – we had to see if we could use the broker.

Examples

Lots of examples are included with RabbitMQ – and getting them up and running was very straightforward – like typing in the executable name with the correct command line arguments and pressing enter easy.  The first two examples I ran were AddServer and AddClient.  And they worked too.  I was getting ready to send kudo’s.

One last thing though – I wanted to try the WCF examples included with the .NET client distribution.  And that worked too.  Kudo’s to the RabbitMQ team.

RabbitMQ and CEP

Before diving into the CEP realm, I did go back and run all of the demos and the server with the newest version of Erlang – 5.7.4.  RabbitMQ’s download is bundled with an older version.  And everything worked just fine.  Again.

In the current world of CEP, the ‘run your app on one big honkin’ machine with lots of cores’ is prevalent.  In this example, you’ve still got to get data in and out of the engine.  And all of the vendors supply their own client API’s for both out-of-process and in-process communication.  So where would I use RabbitMQ?  In this example, using RabbitMQ makes sense for a couple of things:

  • I like the built in request/reply semantics found in AMQP and RabbitMQ.  This makes it very easy to simulate synchronous calls to services out on the network.  And there’s a lot of examples found not only on RabbitMQ’s website for this, but out in the net in general.  These services can even be embedded within the RabbitMQ server.  I like this because in some of the CEP products, it’s difficult to do traditional, procedural stuff sometimes (there’s a reason we’re not seeing a lot of CEP in fixed income, for example).  This is a drawback of using SQL-like languages and why more than a couple of vendors have augmented their DSL’s with additional, procedural extensions.
  • Many times, the data that you need to stuff into your CEP engine is only accessible via custom, vendor specific, api’s – like Reuter’s RFA 6 for example.  In this case, you’re going to either build or buy an adapter from the vendor.  But let’s say that you don’t need that custom API, and you don’t want to pay for additional uses (cores) of that particular message transport, then using RabbitMQ makes a lot of sense.  I really like the use of pub/sub and think it makes a lot of sense, and that we’ll see more of it as CEP solutions are deployed more outside of the traditional consumption of market data, computation of signals, and subsequent order placement.

In the coming world of clustered deployment of CEP services, the use of RabbitMQ makes even more sense.  RabbitMQ supports a number of messaging scenarios and could be used for command/control/monitoring of the grid without running into exorbitant licensing scenarios (one messaging vendor I dealt with when helping the rebuild  of the Boston Stock Exchange wanted to charge for each client of the exchange – each end user! ).  RabbitMQ could also be used for the scenarios described above.

Conclusion

RabbitMQ’s claims on their website are correct.  And refreshing in the world we live in today.  I downloaded the software, installed it, and it ran as advertised.  The client libraries are pervasive with API’s available for Java, .NET, Ruby, Python, the list goes on, and of course, Erlang.  Also, I did have a couple of questions during my evaluation and RabbitMQ’s staff were not only available, but quick to answer.  I like that.  If you haven’t given RabbitMQ and AMQP a try, do so.  And if you’re using RabbitMQ in a CEP deployment, I’d love to hear about it.

Post Script

I’m not going to talk about benchmarks – I did all of this ‘work’ on my laptop, our other machines are busy exploring the far reaches of the universe (DarkStar), and messaging benchmarks are fairly meaningless unless you’re looking at a very specific examples.