Pipelining and Real-time Analytics with MapReduce Online

Most of the news related to the real-time web these days centers around the adoption of decentralized, push-oriented protocols (pubsubhubbub, rsscloud) designed to reduce latency in web publishing. Less discussed are the analytic tools that can are capable of crunching through data in real-time. As more of the web moves towards these types of publishing tools, data-driven organizations will demand low latency analytic tools.

Some organizations create their own real-time analysis tools, while others turn to specialized solutions††. The Huffington Post developed in-house tools that let editors optimize headlines in near real-time. In some domains, the need for real-time analytics isn’t new and companies have moved in with targeted products: SF-based Splunk is a popular real-time analytic tool for IT organizations.

In a previous post, I highlighted SQL-based real-time analytic tools that can handle large amounts of data. Tools like Truviso (based on the Postgres database) and streambase are attractive in that they require little adjustment for developers already familiar with SQL. In the same post, I noted that other big data management systems such as MPP databases and MapReduce/Hadoop were too batch-oriented (load all the data, then analyze) to deliver analysis in near real-time.

At least for MapReduce/Hadoop systems things may have changed slightly since my last post. A group of researchers from UC Berkeley and Yahoo recently modified MapReduce to allow for pipelining between operators. Rather than waiting for a Map or Reduce operator to complete (or “materialize to stable storage”) before kicking off a subsequent operation, their solution is to modify MapReduce to allow intermediate data to be pipelined between operators. As they noted in their paper, pipelining holds several advantages:

A downstream dataflow element can begin consuming data before a producer element has finished execution, which can increase opportunities for parallelism, improve utilization, and reduce response time.

Since reducers begin processing data as soon as it is produced by mappers, they can generate and refine an approximation of their final answer during the course of execution. This technique, known as online aggregation, can reduce the turnaround time for data analysis by several orders of magnitude.

Pipelining widens the domain of problems to which MapReduce can be applied. This allows MapReduce to be applied to domains such as system monitoring and stream processing.

Much like the stream databases I described previously, their approach to pipelining allows MapReduce jobs to “run continuously” and analyze new data as it arrives, enabling MapReduce/Hadoop to handle real-time monitoring and analysis tasks. The kicker is that their method of pipelining preserves the fault-tolerance and programming interfaces developers have come to associate with MapReduce frameworks. As an example, users of their Hadoop Online Prototype (or HOP) can continue continue using Hive or Pig.

In a recent conversation with lead authors Tyson Condie and Neil Conway, they highlighted a few other features of HOP that would make it attractive to current Hadoop users. First, HOP not only preserves Hadoop’s public interfaces, it also allows for jobs to be co-scheduled and pipelined, thus reducing the need to write results to HDFS. Second, pipelining leads to preliminary results and early feedback, resulting in faster debugging cycles. Upon seeing early results, a developer can either kill a task, or toggle between pipeline and block mode. Third, HOP does a better job of handling stragglers (slow running tasks) by using previous results to kick-off smart re-starts. Finally, they are currently incorporating a continuous and adaptive optimizer that for a given task, will let HOP converge to the optimal degree of parallelism. The optimizer will allow HOP to scale up/down, dynamically adding/dropping mappers & reducers, based on data being pipelined. In preliminary experiments, they found that superior cluster utilization via pipelining can mean substantial reductions in job completion times.

For those interested in performing real-time analytics within Hadoop, Tyson and Neil informed us that they will make the HOP code publicly available within a month. When asked if HOP can handle large data sets, they confirmed that researchers inside Yahoo have ongoing (successful) experiments using HOP on “Hadoop scale” data. Over the long-term, they predict some form of pipelining will become standard within Hadoop.

So how does HOP compare with the real-time SQL databases I described in an earlier post? For domains where the latency required is in the order of (sub) milliseconds (e.g. algorithmic trading), HOP probably won’t help. OTOH, solutions like Truviso and streambase have shown they can handle those types of problems. But for a broader class of problems where a delay of a few seconds is acceptable, HOP will be a suitable analytic engine. In terms of usability, tools like Truviso and streambase look and work like standard SQL, making them fairly accessible to a broad class of users. To make HOP more accessible, Tyson and Neil noted that one interesting side project is to modify equivalent MapReduce tools (Hive and Pig) to incorporate “continuous and real-time queries”.

UPDATE (11/12/2009): Neil Conway just announced that the source code for HOP (Hadoop Online Prototype) is now available.

(†) Traditional pull-oriented sytems require subscribers to nag publishers regularly (“Do you have something new?”). Push models deliver content to clients automatically as soon as new content is published (“Don’t call us, we’ll call you.”).

(††) For real-time structured data analysis, enterprises favor the term complex event-processing (CEP). An example is TIBCO’s CEP software.

tags: , , , , , , , ,