Category Archives: Map/Reduce

Building Inverted Indexes on Tweets Using Map/Reduce

In our last post, we looked at how to make bad map/reduce code better map/reduce code.  A natural fallout from breaking tweets down into words is the ability to build an inverted index to facilitate searching tweets by key words.

It’s All in the Tweet

Given the tweets,

  1. “@eventcloudpro I like the idea of tree maps, if you combine these with metrics trees in #PM Strategy Management tools u could align the two,” and
  2. “RT @jakewk RT @eventcloudpro: did streambase find a buyer? http://bit.ly/brTJlx SunGard likes to buy their technology partners #cep”
  3. “I really like #erlang – it’s rocking technology!”

How do we compute an inverted index? First, let’s assign a unique ID to each tweet above – for our example, that’s tweet #1, tweet #2, and tweet #3.  Now we want to find which words are used where so we’ll construct a table consisting of words and the list of tweet id’s that contain those words.

Word                    Tweet

@eventcloudpro     1,2

metrics                  1

technology            2, 3

and so on…

Using the Index

To find tweets with the words we’re interested in, we just query the inverted index.  For example, if I’m interested in finding tweets with the word ‘metrics’, using the index I see that tweet #1 has that word, so I look up tweet #1.  If I’m interested in the words ‘@eventcloudpro’ and ‘technology’, I get the sets (1,2) and (2,3) – and they’re intersection is tweet #2.

I’d Like Extra Sauce, Please

So now that we’ve figured out how to look up specific tweets based upon content, how could we look up tweets based upon other stuff; stuff like categorization, entity extract, root words (stemming), or even sentiment?  By running the tweets through a system that calculates these value added goodies, we could construct additional inverted indexes to further organize our tweets.

Map/Reduce

Using map/reduce in the process of calculating inverted indices is a natural fit.  And there’s a great introduction on how to do this using Erlang in Joe Armstrong’s book, “Programming Erlang.”

Writing Bad Code in Map/Reduce

In the Twitter project we’ve been working on, one of the map’s we’re running breaks the text of a Tweet down into words.  Because we can’t assume that any data will be available for access via a database, etc, we attach a couple of values that we’re interested for later analysis to the word, attach a 1, and emit the tuple.  This is an example of what the tuple looks like:

"TimeZone", "Location", "ScreenName", "Word", 1

This map is produced from a tweet that contains many words, so, crunching the Tweet down will result in many of the above values being duplicated.

"TimeZone", "Location", "ScreenName", "anotherWord", 1
"TimeZone", "Location", "ScreenName", "yetAnotherWord", 1
"TimeZone", "Location", "ScreenName", "Word", 1
"TimeZone", "Location", "ScreenName", "Word", 1

But if you notice, there’s a way to compress this a bit. On of the problems with map/reduce (maybe not a problem, but a caveat) is that it’s quite easy to swamp the network. In the example above, we’ll be emitting a tuple (the tuple is slightly longer in production) and repeating information for each word.

What if instead, we used an associative array when crunching down the tweet. For example, after crunching the data above, our data structure would look something like:

"TimeZone", "Location", "ScreenName", {word, 3, anotherWord, 1, yetAnotherWord, 1}

By attaching an associative array of words and their count to the result in the map we dramatically decrease the amount of data being moved around. This reduces both the storage required (if you’re doing old fashioned Hadoop) but more importantly, the network traffic if you’re doing streaming map/reduce.

Of course, this means that the reduce function needs to be aware that it’s getting more than the simple string emitted in our first example, but we’ve saved a lot of bandwidth in the process.

Building a Private Cloud (IaaS)

Cloud Defined (IaaS or Infrastructure as a Service)

I was speaking with a company last week that would like to get into cloud event processing.  For the sake of discussion, I’m going to define ‘cloud’ as the ability to dynamically, and upon demand, use more or less compute, storage, or network capacity.  This is often referred to as ‘elasticity.’  I’m also going to add a few other requirements; this cloud must be able to:

  1. Host operations for a number of customers, all accessing the same data, but in different ways,
  2. Provide access to a lot of big data (see #3 – it’s all stored too),
  3. Able to handle messages rates approach 1MM per second (yes, that’s a million messages per second),
  4. Do something with those messages before storing them (in fact, we’re not even worried about storing them),
  5. And while we’re doing something with those messages, relate them to previously stored messages.

Cloud Event Processing Defined

And I’m going to define cloud event processing as, ‘using many varieties of event processing; including complex event processing (CEP), map/reduce, and event, or message, driven agents to process events.’  Using DarkStar, we’re able to deploy these various forms of event processing without regard for underlying storage, compute or network – the event processing services are simply deployed and begin processing; if a particular service needs more resources, it asks for them and when it receives them, it uses them.

Private Cloud Defined

It’s a private cloud when you deploy the above (Iaas) internally.  It’s a public cloud when it’s deployed publicly, and it’s a hybrid cloud when it’s deployed both privately and publicly.

Data Affinity

The large amount of inbound network bandwidth that would be required to get this particular cloud off the ground and into the sky already makes hosting with Amazon or Rackspace more than a ‘sign up with a credit card’ process.  Having the elastic resources available to each customer of our customer makes things even more difficult.  See, our customer has 100′s of customers who want to do something with this data.  And each one of those customers will need access to both the old data, and the streaming 1MM messages per second.  Without creating a separate physical footprint in a cloud provider, it would be very difficult to solve this problem.

Buy vs Build

The more things change, the more they stay the same.  So, now we’re involved in the old, ‘how much to build vs buy?’ process.  The thing is, this company has the skills to run their own cloud so we’ll most likely start with a proof of concept and then grow it.  We’re cloud seeding at Cloud Event Processing!

The Test Platform

Our recent work with Twitter will be our first deployment in this cloud infant.  Using DarkStar, we’re able to analyze the Twitter feed to sense and respond to opportunities and threats.  This relatively simple project includes components of event driven agents, NoSQL storage, CEP sliding windows & aggregation, visualization, map/reduce and analytics.  (For those of you who’ve been reading all along, you’re asking, ‘analytics?’-we didn’t see any of those Colin! – and you’re right, you didn’t – we’ll share those when we’re ready.)

Kickin’ Tires & Lightin’ Fires

We’ll be starting off with some bog standard Intel hardware, Eucalyptus, and the Ubuntu Enterprise Cloud.  There’s a white paper available on Eucalyptus’s site that details our initial approach.

Thanks for reading!

TwitYourl, Twitter, and Treemaps from Panopticon

This is a treemap visualization (from Panopticon) hooked up to some static data as crunched by our TwitYourl project, our first foray into Cloud Event Processing together.

Map/Reduce, Twitter, RabbitMQ & CEP – Let's Wrap It Up

Ok, so if you’ve been following along, you’ll already know:

  1. Installed RabbitMQ, and
  2. Written a Twitter OnRamp that subscribes to a sample Twitter feed and publishes it on RabbitMQ, and
  3. Written a RuleBot that takes the tweet and breaks it into chunks, putting the chunks back onto the bus.

And now I’m going to show the Esper code that gives us the top URL’s in the last X seconds found in the Twitter feed.  Here’s a screenshot.

.

Here’s some output in text mode (easier to read):

Event received: {URL=http:\\cnn.com, Total=731}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=643}
Event received: {URL=http:\\rabbitmqm.com, Total=636}
Event received: {URL=http:\\cnn.com, Total=738}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=651}
Event received: {URL=http:\\rabbitmqm.com, Total=642}
Event received: {URL=http:\\cnn.com, Total=742}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=660}
Event received: {URL=http:\\rabbitmqm.com, Total=643}
Event received: {URL=http:\\cnn.com, Total=743}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=660}
Event received: {URL=http:\\rabbitmqm.com, Total=652}
Event received: {URL=http:\\cnn.com, Total=752}
Event received: {URL=http:\\colinclarkeventprocessing.com, Total=661}
Event received: {URL=http:\\rabbitmqm.com, Total=654}
Event received: {URL=http:\\cnn.com, Total=759}

Essentially, what’s happening here is that we’re:

  1. Getting demo data (so I know what the relative ratio’s are to get a feel for whether or not this is correct), and
  2. Pumping it into Esper via an URL object, and
  3. Executing a statement (“select URL, sum(count) as Total from TwitterURL.win:time(5 seconds) group by URL”).

This statement gives us the total number of times an URL has been tweeted in the last 5 seconds.  Whenever that URL is tweeted, the sum is calculated, and a listener is called.  This is a very straightforward statement – we could tweak the statement a bit more, or do some stuff in code-like creating a map of URL’s with the current sum (rank).  Or maybe even another stream created.  Either way would expose the information (most popular URL’s) to any interested party.

All that’s left now is to get all of this up and running in the cloud – and that’s what I call real CEP – Cloud Event Processing.  Check in next week.

CEP in the Cloud – with Load Balancing (Part 2)

It’s time to delve a little deeper into what’s proving to be a thorny issue for doing anything in the cloud – especially if you’ve got concerns over once, only once, and in sequence – something that Financial Market’s participants have to deal with every day. We’re not going to get into that level of depth for this post. But I am going to show one way to deploy CEP in the cloud and use a CEP based continuous query to provide load balancing in our classic Map/Reduce (or what has been referred to as a streaming fold) example.

Load Balancing – Partitioning

In our example, we could instruct the RuleBots to only listen to tweets beginning from A-D, E-G, etc. The CEP Service Manager would start up how ever many instances initially required and set up this initial subscription. In the event that the subscriptions needed to change for load balancing requirements, the Load Balancer could stop all of the instances in the Service Pool, re-partition them, and then start them up. All without losing any messages and without processing any message more than once. Even without dealing with in-order (sequence based) processing, one can see how this particular approach leaves a little bit to be desired.

Load Balancing – Selecting a Destination

In the example below, we can see the RuleBots sending resource utilization statistics to the CEP Load Balancing Process.  In this process, the CEP Load Balancing Process listens to all relevant processes comprising a CEP Service Pool and constructs what I call a validity stream – some others call it an ‘update in place’ query.  Essentially, selecting from the stream (“select virtual-node from cep-service-pool where cep-service = “WORDCOUNT” order by resource-utilization desc” ) gives the OnRamp a destination for the current tweet JSON object.  I like this approach a lot.  New processing resources can be added to a CEP service pool dynamically, the CEP load balancer doesn’t really need to know anything about them – the service just publishes their stats and they’re eligible for processing.  Cool.  A picture paints the picture below.

A Picture Is Worth 1,000 Words – Current State of Map/Reduce in CEP Land

Map/Reduce, Twitter, RabbitMQ, and CEP – A More Realistic Example (part 1.5)

Progress Update

I’ve updated all of my RabbitMQ installations to 1.7.1 and started to clear the Eclipse cobwebs out of my head and have found out that I do actually remember enough Java to be dangerous.

Twitter OnRamp

I’ve modified the gist Twitter Client into what I refer to as an OnRamp – we use OnRamps to get messages/event onto the information Super Highway.  In the Twitter OnRamp, I’ve subscribed to the samples.json feed and am publishing “jsonObject.getString(“text”)” to the RabbitMQ bus.  Before we talk about particulars regarding the AMQP specifics, I’d like to briefly touch upon load balancing, especially since we’re going to deploy the example in EC2 when we’re done here.

The Picture on the Whiteboard

Let’s paint a virtual whiteboard picture – on the left side of the board, you’ve got the Twitter OnRamp.  The Twitter OnRamp is publishing tweets onto the bus.  For now, we don’t care about those specifics.

As we move right on the whiteboard, you’ll see a number of CEP instances – all waiting for messages/event to consume so that they can publish value added information back out onto the bus for those interested in receiving it.

Some of those CEP instances will be running the MAP in our example – taking tweets in and tokenizing them – just like in our previous example.  To illustrate, the tweet, “colin clark is writing Java code again much to the chagrin of several engineers” will be tokenized and emitted back onto the bus – it will look like this:

colin, 1

clark, 1

is, 1

writing, 1

Another CEP instance will be listening for those MAP’ed messages and using them as the foundation for a window/aggregate.  This window/aggregate is our REDUCE function.  Our window will give us the most popular words and their relative rank as published via the Twitter sample feed.  Notice that I said “instance” for this function as opposed to “instances” for the MAP function.  Let’s deal with the instances above before answering that question.

Load Balancing In The Cloud

So, our Twitter OnRamp is happy subscribing to the firehose and publishing messages for further processing.  We can safely assume that if one process were tasked with tokenizing and emitting that our processing could very quickly fall behind.  We don’t want that, obviously!

So, as our OnRamp is busy publishing away, is there an obvious way to direct those messages to specific CEP instances on the bus?  Sure, but let’s introduce a 2 rules first:

  1. Messages/Events will be consumed in our system with no regard for their origination, and
  2. Messages/Events will be published with no regard for their either direct or eventual destination.

Hmmm.  These rules would seem to disallow us from tagging messages with a specific CEP instance destination from within the OnRamp.  This leaves us with 3 obvious alternatives to explicitly setting a destination CEP instance:

  1. Use one big honkin’ CEP instance on one big honkin’ machine (the current practice),
  2. Utilize a round robin approach, and softly violate our rules, or
  3. Incorporate some centrally managed directory service that deploys and configures the CEP instances dynamically (hint: I like this one).

For now, we’re going to simulate #3 above, and in the next update, I’ll show you what exactly we’re simulating.  So in the next example, look for a CEP based service which will subscribe to tweets and emit words back out onto the bus for further processing.

Map/Reduce, Twitter, RabbitMQ, and CEP – A More Realistic Example (part 1)

I thought it would make sense to flesh out the previous “Map/Reduce in CEP Land” post, so I did.

We’re going to show a Java Twitter client publishing tweets to a RabbitMQ bus where they’ll be tokenized and emitted (map) for reduction (in a CEP engine).  We’ll construct a stream showing the most popular words over the last hour.

So, first, a Twitter Java client (from gist) – here it is running in Eclipse – we’re subscribing to the sample.json feed:

If you’re going to follow along on this little sojourn, make sure to change the URL you’re subscribing to in the client to:

“http://stream.twitter.com/1/statuses/sample.json”

I call the twitter client an OnRamp – we’ll use this OnRamp to publish messages to the RabbitMQ bus. We’re going to publish the messages without tokenizing them – we’ll do that in the CEP engine. By using the RabbitMQ bus, it will be straightforward to partition the processing.

In the next post, we’ll show the code to get the message onto the bus, and we’ll contemplate how that might be done incorporating some type of load balancing – that’s right, we’ll be hitting multiple CEP engines in this example.

Map/Reduce in CEP Land – A Simple Example

I thought I would share just how easily the example I describe in this post could be implemented.  Here’s some code and commentary:

This is from the Erlang shell:

1> {module,darkstar}
we've loaded the DarkStar CEP Module
2> {module,mapwordcount}
and the map function
3> {module,stream}
and we'd like a stream please
4> Stream = stream:start().
let's start up a stream
<0.36.0>
Stream started
5> Supervisor=darkstar:start().
and start up DarkStar
<0.38.0>
Supervisor is now running
6> mapwordcount:map("colin clark loves CEP", Stream, Supervisor).
and call the map function with a Stream and a Supervisor process
<0.31.0>
map is now running
Received:<0.31.0>
when the map is done, it lets the Supervisor know
Enqueing "colin", 1
here, we see "colin clark loves CEP" broken down into
words and appended to the Stream
7> Enqueing "clark", 1
7> Enqueing "loves", 1
7> Enqueing "CEP", 1

It’s that easy.  It’s also very straightforward to create a Window on the Stream and ask it to sum by key over a time frame and updating every minute or so (for example).

Now granted, I’ve left out a couple of things (like the underlying code),  and we’re running in verbose mode so that we can get a little visibility into the Stream.  One thing to note though is that mapwordcount and the stream could be running on any node within the DarkStar fabric.  I could have also started mapwordcount via the Supervisor, which is much more ‘grid like’ than the example I’ve shown above but this should demonstrate the gist of where I’m going.

And yes, DarkStar is written in Erlang.