Building an Activity Feed System with Storm

One of many wonderfully functional recipes from the Clojure Cookbook

clj_cookbookEditor’s Note: The Clojure Cookbook is a recently published book by experienced Clojurists Luke VanderHart and Ryan Neufeld. It seeks to be a practical collection of tasks for intermediate Clojure programmers. In addition to providing their own recipes, Ryan and Luke accepted contributions from a number of people in the community. One of those contributors was Travis Vachon–in this excerpt from the Cookbook, Travis gives you a tried and true recipe for working with Clojure and Storm.


Problem

You want to build an activity stream processing system to filter and aggregate the raw event data generated by the users of your application.

Solution

Streams are a dominant metaphor for presenting information to users of the modern Internet. Used on sites like Facebook and Twitter and mobile apps like Instagram and Tinder, streams are an elegant tool for giving users a window into the deluge of information generated by the applications they use every day.

As a developer of these applications, you want tools to process the firehose of raw event data generated by user actions. They must offer powerful capabilities for filtering and aggregating data and must be arbitrarily scalable to serve ever-growing user bases. Ideally they should provide high-level abstractions that help you organize and grow the complexity of your stream-processing logic to accommodate new features and a complex world.

Clojure offers just such a tool in Storm, a distributed real-time computation system that aims to be for real-time computation what Hadoop is for batch computation. In this section, you’ll build a simple activity stream processing system that can be easily extended to solve real-world problems.

First, create a new Storm project using its Leiningen template:

$ lein new cookbook-storm-project feeds

In the project directory, run the default Storm topology (which the lein template has generated for you):

$ cd feeds
$ lein run -m feeds.topology/run!
Compiling feeds.TopologySubmitter
...
Emitting: spout default [:bizarro]
Processing received message source: spout:4, stream: default, id: {}, [:bizarro]
Emitting: stormy-bolt default ["I'm bizarro Stormy!"]
Processing received message source: stormy-bolt:5,
  stream: default, id: {}, [I'm bizarro Stormy!]
Emitting: feeds-bolt default ["feeds produced: I'm bizarro Stormy!"]

This generated example topology just babbles example messages incoherently, which probably isn’t what you want, so begin by modifying the “spout” to produce realistic events.

In Storm parlance, the “spout” is the component that inserts data into the processing system and creates a data stream. Open src/feeds/spouts.clj and replace the defspout form with a new spout that will periodically produce random user events such as one might see in an online marketplace (in a real application, of course, you’d hook this up to some source of real data rather than a random data generator):

(defspout event-spout ["event"]
  [conf context collector]
  (let [events [{:action :commented, :user :travis, :listing :red-shoes}
                {:action :liked, :user :jim, :listing :red-shoes}
                {:action :liked, :user :karen, :listing :green-hat}
                {:action :liked, :user :rob, :listing :green-hat}
               {:action :commented, :user :emma, :listing :green-hat}]]
    (spout
     (nextTuple []
       (Thread/sleep 1000)
       (emit-spout! collector [(rand-nth events)])))))

Next, open src/feeds/bolts/clj. Add a bolt that accepts a user and an event and produces a tuple of (user, event) for each user in the system. A bolt consumes a stream, does some processing, and emits a new stream:

(defbolt active-user-bolt ["user" "event"] [{event "event" :as tuple} collector]
       (doseq [user [:jim :rob :karen :kaitlyn :emma :travis]]
    (emit-bolt! collector [user event]))
  (ack! collector tuple))

Now add a bolt that accepts a user and an event and emits a tuple if and only if the user is following the user who triggered the event:

(defbolt follow-bolt ["user" "event"] {:prepare true}
  [conf context collector]
  (let [follows {:jim #{:rob :emma}
                 :rob #{:karen :kaitlyn :jim}
                 :karen #{:kaitlyn :emma}
                 :kaitlyn #{:jim :rob :karen :kaitlyn :emma :travis}
                 :emma #{:karen}
                 :travis #{:kaitlyn :emma :karen :rob}}]
    (bolt
     (execute [{user "user" event "event" :as tuple}]
              (when ((follows user) (:user event))
                (emit-bolt! collector [user event]))
              (ack! collector tuple)))))

Finally, add a bolt that accepts a user and an event and stores the event in a hash of sets like {:user1 #{event1 event2} :user2 #{event1 event2}}—these are the activity streams you’ll present to users:

(defbolt feed-bolt ["user" "event"] {:prepare true}
  [conf context collector]
  (let [feeds (atom {})]
    (bolt
     (execute [{user "user" event "event" :as tuple}]
              (swap! feeds #(update-in % [user] conj event))
              (println "Current feeds:")
              (clojure.pprint/pprint @feeds)
              (ack! collector tuple)))))

This gives you all the pieces you’ll need, but you’ll still need to assemble them into a computational topology. Open up src/feeds/topology.clj and use the topology DSL to wire the spouts and bolts together:

(defn storm-topology []
  (topology
   {"events" (spout-spec event-spout)}

   {"active users" (bolt-spec {"events" :shuffle} active-user-bolt :p 2)
    "follows" (bolt-spec {"active users" :shuffle} follow-bolt :p 2)
    "feeds" (bolt-spec {"follows" ["user"]} feed-bolt :p 2)}))

You’ll also need to update the :require statement in that file:

  (:require [feeds
             [spouts :refer [event-spout]]
             [bolts :refer [active-user-bolt follow-bolt feed-bolt]]]
            [backtype.storm [clojure :refer [topology spout-spec bolt-spec]]
                            [config :refer :all]])

Run the topology again. Feeds will be printed to the console by the final bolts in the topology:

$ lein run -m feeds.topology/run!

Discussion

Storm’s Clojure DSL doesn’t look like standard Clojure. Instead, it uses Clojure’s macros to extend the language to the domain of stream processing. Storm’s stream processing abstraction consists of four core primitives:

Tuples

Allow programmers to provide names for values. Tuples are dynamically typed lists of values.

Spouts

Produce tuples, often by reading from a distributed queue.

Bolts

Accept tuples as input and produce new tuples—these are the core computational units of a Storm topology.

Streams

Used to wire spouts to bolts and bolts to other bolts, creating a computational topology. Streams can be configured with rules for routing certain types of tuples to specific instances of bolts.

The following subsections review the components of our system to give a better picture of how these primitives work together.

event-spout

defspout looks much like Clojure’s standard defn, with one difference—the second argument to defspout is a list of names that will be assigned to elements of each tuple this spout produces. This lets you use tuples like vectors or maps interchangeably. The third argument to defspout is a list of arguments that will be bound various components of Storm’s operational infrastructure.

In the case of the event-spout spout, only collector is used:

(defspout event-spout ["event"]
  [conf context collector]

defspout’s body will be evaluated once, when the spout instance is created, which gives you an opportunity to create in-memory state. Usually this will be a connection to a database or distributed queue, but in this case you’ll create a list of events this spout will produce:

(let [events [{:action :commented, :user :travis, :listing :red-shoes}
                {:action :liked, :user :jim, :listing :red-shoes}
                {:action :liked, :user :karen, :listing :green-hat}
                {:action :liked, :user :rob, :listing :green-hat}
                {:action :commented, :user :emma, :listing :green-hat}]]

This call to spout creates an instance of a spout with the given implementation of nextTuple. This implementation simply sleeps for one second and then uses emit-spout! to emit a one-element tuple consisting of a random event from the preceding list:

(spout
 (nextTuple []
   (Thread/sleep 1000)
   (emit-spout! collector [(rand-nth events)])))))

nextTuple will be called repeatedly in a tight loop, so if you create a spout that polls an external resource, you may need to provide your own backoff algorithm to avoid excess load on that resource.

You can also implement the spout’s ack method to implement a “reliable” spout that will provide message-processing guarantees. For more information on reliable spouts, see Storm’s spout implementation for the Kestrel queueing system, storm-kestrel.

active-user-bolt

Every time a user takes an action in this system, the system needs to determine whether each other user in the system will be interested in it. Given a simple interest system like Twitter, where users express interest in a single way (i.e., user follows), you could simply look at the follower list of the user who took the action and update feeds accordingly. In a more complex system, however, interest might be expressed by having liked the item the action was taken against, following a collection that the item has been added to, or following the seller of the item. In this world, you need to consider a variety of factors for each user in the system for every event and determine whether the event should be added to that user’s feed.

The first bolt starts this process by generating a tuple of (user, event) for each user in the system every time an event is generated by the event-spout:

(defbolt active-user-bolt ["user" "event"] [{event "event" :as tuple} collector]
  (doseq [user [:jim :rob :karen :kaitlyn :emma :travis]]
    (emit-bolt! collector [user event]))
  (ack! collector tuple))

defbolt’s signature looks very similar to defspout. The second argument is a list of names that will be assigned to tuples generated by this bolt, and the third argument is a list of parameters. The first parameter will be bound to the input tuple, and may be destructured as a map or a vector.

The body of this bolt iterates through a list of users in the system and emits a tuple for each of them. The last line of the body calls ack! on this tuple, which allows Storm to track message processing and restart processing when appropriate.

follow-bolt

The next bolt is a prepared bolt; that is, one that maintains in-memory state. In many cases, this would mean maintaining a connection to a database or a queue, or a data structure aggregating some aspect of the tuples it processes, but this example maintains a complete list of the followers in the system in memory.

This bolt looks more like the spout definition. The second argument is a list of names, the third argument is a map of bolt configuration options (importantly, these set :prepare to true), and the fourth argument is the same set of operational arguments received in defspout:

(defbolt follow-bolt ["user" "event"] {:prepare true}
  [conf context collector]

The body of the bolt first defines the list of followers, and then provides the actual bolt definition inside a call to bolt:

  (let [follows {:jim #{:rob :emma}
                 :rob #{:karen :kaitlyn :jim}
                 :karen #{:kaitlyn :emma}
                 :kaitlyn #{:jim :rob :karen :kaitlyn :emma :travis}
                 :emma #{:karen}
                 :travis #{:kaitlyn :emma :karen :rob}}]
    (bolt
     (execute [{user "user" event "event" :as tuple}]
              (when ((follows user) (:user event))
                (emit-bolt! collector [user event]))
              (ack! collector tuple)))))

Note that the tuple argument is inside the bolt’s definition of execute in this case and may be destructured as usual. In cases where the event’s user is not following the user in the tuple, it does not emit a new tuple and simply acknowledges that it received the input.

As noted earlier, this particular system could be implemented much more simply by querying whatever datastore tracks follows and simply adding a story to the feed of each follower. Anticipating a more complicated system, however, provides a massively extensible architecture. This bolt could easily be expanded to a collection of scoring bolts, each of which would evaluate a user/event pair based on its own criteria and emit a tuple of (user, event, score). A score aggregation bolt would receive scores from each scoring bolt and choose to emit a tuple once it received scores from each type of scoring bolt in the system. In this world, adjusting the factors determining the makeup of a user’s feed and their relative weights would be trivial—indeed, production experience with just such a system was, in the opinion of the authors, delightful (see the Rising Tide project page on GitHub).

feed-bolt

The final bolt aggregates events into feeds. Since it only receives (user, event) tuples that the “scoring system” has approved, it needs only add the event to the existing list of events it has received for the given user:

  (let [feeds (atom {})]
    (bolt
     (execute [{user "user" event "event" :as tuple}]
              (swap! feeds #(update-in % [user] conj event))
              (println "Current feeds:")
              (clojure.pprint/pprint @feeds)
              (ack! collector tuple))))

This toy topology simply prints the current feeds every time it receives a new event, but in the real world it would persist feeds to a durable datastore or a cache that could efficiently serve the feeds to users.
Note that this design can be easily extended to support event digesting; rather than storing each event separately, it could aggregate an incoming event with other similar events for the user’s convenience.

As described, this system has one enormous flaw: by default, Storm tuples are delivered to exactly one instance of each bolt, and the number of instances in existence is not defined in the bolt implementation. If the topology operator adds more than one feed-bolt, we may have events for the same user delivered to different bolt instances, giving each bolt a different feed for the same user.

Happily, this flaw is addressed by Storm’s support for stream grouping, which is defined in the Storm topology definition.

Topology

The topology definition is where the rubber meets the road. Spouts are wired to bolts, which are wired to other bolts, and the flow of tuples between them can be configured to give useful properties to the computation.

This is also where you define the component-level parallelism of the topology, which provides a rough sketch of the true operational parallelism of the system.

A topology definition consists of spout specifications and bolt specifications, each of which is a map from names to specifications.

Spout specifications simply give a name to a spout implementation:

   {"events" (spout-spec event-spout)}

Multiple spouts can be configured, and the specification may define the parallelism of the spout:

   {
     "events" (spout-spec event-spout)
     "parallel-spout" (spout-spec a-different-more-parallel-spout :p 2)
   }

This definition means the topology will have one instance of event-spout and two instances of a-different-more-parallel-spout.

Bolt definitions get a bit more complicated:

"active users" (bolt-spec {"events" :shuffle} active-user-bolt :p 2)
"follows" (bolt-spec {"active users" :shuffle} follow-bolt :p 2)

As with the spout spec, you must provide a name for the bolt and specify its parallelism. In addition, bolts require specifying a stream grouping, which defines (a) from which component the bolt receives tuples and (b) how the system chooses which in-memory instance of the bolt to send tuples to. Both of these cases specify :shuffle, which means tuples from “events” will be sent to a random instance of active-user-bolt, and tuples from “active users” will be sent to a random instance of follow-bolt.

As noted, feed-bolt needs to be more careful:

"feeds" (bolt-spec {"follows" ["user"]} feed-bolt :p 2)

This bolt spec specifies a fields grouping on “user”. This means that all tuples with the same “user” value will be sent to the same instance of feed-bolt. This stream grouping is configured with a list of field names, so field groupings may consider the equality of multiple field values when determining which bolt instance should process a given tuple.
Storm also supports stream groupings that send tuples to all instances and groupings that let the bolt producing a tuple determine where to send it. Combined with the groupings already seen, these provide an enormous amount of flexibility in determining how data flows through your topology.

Each of these component specifications supports a parallelism option. Because the topology does not specify the physical hardware upon which it will run, these hints cannot be used to determine the true parallelism of the system, but they are used by the cluster to determine how many in-memory instances of the specified components to create.

Deployment

The real magic of Storm comes out in deployment. Storm gives you the tools to build small, independent components that make no assumptions about how many identical instances are running in the same topology. This means that the topology itself is essentially infinitely scalable. The edges of the system, which receive data from and send data to external components like queues and databases, are not necessarily as scalable, but in many cases, strategies for scaling these services are well understood.

A simple deployment strategy is built into the Storm library:

  (doto (LocalCluster.)
    (.submitTopology "my first topology"
                     {TOPOLOGY-DEBUG (Boolean/parseBoolean debug)
                      TOPOLOGY-WORKERS (Integer/parseInt workers)}
                     (storm-topology)))

LocalCluster is an in-memory implementation of a Storm cluster. You can specify the number of workers it will use to execute the components of your topology and submit the topology itself, at which point it begins polling the nextTuple methods of the topology’s spouts. As spouts emit tuples, they are propagated through the system to complete the topology’s computation.

Submitting the topology to a configured cluster is nearly as simple, as you can see in src/feeds/TopologySubmitter.clj:

(defn -main [& {debug "debug" workers "workers" :or {debug "false" workers "4"}}]
  (StormSubmitter/submitTopology
   "feeds topology"
   {TOPOLOGY-DEBUG (Boolean/parseBoolean debug)
    TOPOLOGY-WORKERS (Integer/parseInt workers)}
   (storm-topology)))

This file uses Clojure’s Java interop to generate a Java class with a main method. Because the project.clj file specifies that this file should be ahead-of-time compiled, when you use lein uberjar to build a JAR suitable for submission to the cluster, this file will be compiled to look like a normal Java class file. You can upload this JAR to the machine running Storm’s Nimbus daemon and submit it for execution using the storm command:

$ storm jar path/to/thejariuploaded.jar feeds.TopologySubmitter "workers" 5

This command will tell the cluster to allocate five dedicated workers for this topology and begin polling nextTuple on all of its spouts, as it did when you used LocalCluster. A cluster may run any number of topologies simultaneously—each worker is a physical JVM and may end up running instances of many different bolts and spouts.

The full details of setting up and running a Storm cluster are out of the scope of this recipe, but they are documented extensively on Storm’s wiki.

Conclusion

We’ve only touched on a fraction of the functionality Storm has to offer. Built-in distributed remote procedure calls allow users to harness the power of a Storm cluster to make synchronous requests that trigger a flurry of activity across hundreds or thousands of machines.
Guaranteed data-processing semantics allow users to build extremely robust systems. Trident, a higher-level abstraction over Storm’s primitives, provides breathtakingly simple solutions to complicated real-time computing problems. A detailed runtime console provides crucial insight into the runtime characteristics of a fully operational Storm cluster. The power provided by this system is truly remarkable.

Storm is also a fantastic example of Clojure’s ability to be extended to a problem domain. Its constructs idiomatically extend Clojure syntax and allow the programmer to stay within the domain of real-time processing, without needing to deal with low-level language formalities. This allows Storm to truly get out of the way. The majority of the code in a well-written Storm topology’s code base is focused on the problem at hand. The result is concise, maintainable code and happy programmers.

See Also

Storm’s website

The Storm project template

storm-deploy, a tool for easy Storm deployment

Rising Tide, the feed generation service on which this recipe is based

tags: , ,