Questioning the Lambda Architecture

The Lambda Architecture has its merits, but alternatives are worth exploring.

Nathan Marz wrote a popular blog post describing an idea he called the Lambda Architecture (“How to beat the CAP theorem“). The Lambda Architecture is an approach to building stream processing applications on top of MapReduce and Storm or similar systems. This has proven to be a surprisingly popular idea, with a dedicated website and an upcoming book. Since I’ve been involved in building out the real-time data processing infrastructure at LinkedIn using Kafka and Samza, I often get asked about the Lambda Architecture. I thought I would describe my thoughts and experiences.

What is a Lambda Architecture and how do I become one?

The Lambda Architecture looks something like this:

Lambda_Architecture

The way this works is that an immutable sequence of records is captured and fed into a batch system and a stream processing system in parallel. You implement your transformation logic twice, once in the batch system and once in the stream processing system. You stitch together the results from both systems at query time to produce a complete answer.

There are a lot of variations on this, and I’m intentionally simplifying a bit. For example, you can swap in various similar systems for Kafka, Storm, and Hadoop, and people often use two different databases to store the output tables, one optimized for real time and the other optimized for batch updates.

The Lambda Architecture is aimed at applications built around complex asynchronous transformations that need to run with low latency (say, a few seconds to a few hours). A good example would be a news recommendation system that needs to crawl various news sources, process and normalize all the input, and then index, rank, and store it for serving.

I have been involved in building a number of real-time data systems and pipelines at LinkedIn. Some of these worked in this style, and upon reflection, it is not my favorite approach. I thought it would be worthwhile to describe what I see as the pros and cons of this architecture, and also give an alternative I prefer.

What’s good about this?

I like that the Lambda Architecture emphasizes retaining the input data unchanged. I think the discipline of modeling data transformation as a series of materialized stages from an original input has a lot of merit. This is one of the things that makes large MapReduce workflows tractable, as it enables you to debug each stage independently. I think this lesson translates well to the stream processing domain. I’ve written some of my thoughts about capturing and transforming immutable data streams here.

I also like that this architecture highlights the problem of reprocessing data. Reprocessing is one of the key challenges of stream processing but is very often ignored. By “reprocessing,” I mean processing input data over again to re-derive output. This is a completely obvious but often ignored requirement. Code will always change. So, if you have code that derives output data from an input stream, whenever the code changes, you will need to recompute your output to see the effect of the change.

Why does code change? It might change because your application evolves and you want to compute new output fields that you didn’t previously need. Or it might change because you found a bug and need to fix it. Regardless, when it does, you need to regenerate your output. I have found that many people who attempt to build real-time data processing systems don’t put much thought into this problem and end-up with a system that simply cannot evolve quickly because it has no convenient way to handle reprocessing. The Lambda Architecture deserves a lot of credit for highlighting this problem.

There are a number of other motivations proposed for the Lambda Architecture, but I don’t think they make much sense. One is that real-time processing is inherently approximate, less powerful, and more lossy than batch processing. I actually do not think this is true. It is true that the existing set of stream processing frameworks are less mature than MapReduce, but there is no reason that a stream processing system can’t give as strong a semantic guarantee as a batch system.

Another explanation I have heard is that the Lambda Architecture somehow “beats the CAP theorem” by allowing a mixture of different data systems with different trade-offs. Long story short, although there are definitely latency/availability trade-offs in stream processing, this is an architecture for asynchronous processing, so the results being computed are not kept immediately consistent with the incoming data. The CAP theorem, sadly, remains intact.

And the bad…

The problem with the Lambda Architecture is that maintaining code that needs to produce the same result in two complex distributed systems is exactly as painful as it seems like it would be. I don’t think this problem is fixable.

Programming in distributed frameworks like Storm and Hadoop is complex. Inevitably, code ends up being specifically engineered toward the framework it runs on. The resulting operational complexity of systems implementing the Lambda Architecture is the one thing that seems to be universally agreed on by everyone doing it.

Why can’t the stream processing system be improved to handle the full problem set in its target domain?One proposed approach to fixing this is to have a language or framework that abstracts over both the real-time and batch framework. You write your code using this higher level framework and then it “compiles down” to stream processing or MapReduce under the covers. Summingbird is a framework that does this. This definitely makes things a little better, but I don’t think it solves the problem.

Ultimately, even if you can avoid coding your application twice, the operational burden of running and debugging two systems is going to be very high. And any new abstraction can only provide the features supported by the intersection of the two systems. Worse, committing to this new uber-framework walls off the rich ecosystem of tools and languages that makes Hadoop so powerful (Hive, Pig, Crunch, Cascading, Oozie, etc).

By way of analogy, consider the notorious difficulties in making cross-database ORM really transparent. And consider that this is just a matter of abstracting over very similar systems providing virtually identical capabilities with a (nearly) standardized interface language. The problem of abstracting over totally divergent programming paradigms built on top of barely stable distributed systems is much harder.

We have done this experiment

We have actually been through a number of rounds of this at LinkedIn. We have built various hybrid-Hadoop architectures and even a domain-specific API that would allow code to be “transparently” run either in real time or in Hadoop. These approaches worked, but none were very pleasant or productive. Keeping code written in two different systems perfectly in sync was really, really hard. The API meant to hide the underlying frameworks proved to be the leakiest of abstractions. It ended up requiring deep Hadoop knowledge as well as deep knowledge of the real-time layer — and adding the new requirement that you understand enough about how the API would translate to these underlying systems whenever you were debugging problems or trying to reason about performance.

These days, my advice is to use a batch processing framework like MapReduce if you aren’t latency sensitive, and use a stream processing framework if you are, but not to try to do both at the same time unless you absolutely must.

So, why the excitement about the Lambda Architecture? I think the reason is because people increasingly need to build complex, low-latency processing systems. What they have at their disposal are two things that don’t quite solve their problem: a scalable high-latency batch system that can process historical data and a low-latency stream processing system that can’t reprocess results. By duct taping these two things together, they can actually build a working solution.

In this sense, even though it can be painful, I think the Lambda Architecture solves an important problem that was otherwise generally ignored. But I don’t think this is a new paradigm or the future of big data. It is just a temporary state driven by the current limitation of off-the-shelf tools. I also think there are better alternatives.

An alternative

As someone who designs infrastructure, I think the glaring question is this: why can’t the stream processing system just be improved to handle the full problem set in its target domain? Why do you need to glue on another system? Why can’t you do both real-time processing and also handle the reprocessing when code changes? Stream processing systems already have a notion of parallelism; why not just handle reprocessing by increasing the parallelism and replaying history very, very fast? The answer is that you can do this, and I think this it is actually a reasonable alternative architecture if you are building this type of system today.

When I’ve discussed this with people, they sometimes tell me that stream processing feels inappropriate for high-throughput processing of historical data. But I think this is an intuition based mostly on the limitations of systems they have used, which either scale poorly or can’t save historical data. This leaves them with a sense that a stream processing system is inherently something that computes results off some ephemeral streams and then throws all the underlying data away. But there is no reason this should be true. The fundamental abstraction in stream processing is data flow DAGs, which are exactly the same underlying abstraction in a traditional data warehouse (a la Volcano) as well as being the fundamental abstraction in the MapReduce successor Tez. Stream processing is just a generalization of this data-flow model that exposes checkpointing of intermediate results and continual output to the end user.

So, how can we do the reprocessing directly from our stream processing job? My preferred approach is actually stupidly simple:

  1. Use Kafka or some other system that will let you retain the full log of the data you want to be able to reprocess and that allows for multiple subscribers. For example, if you want to reprocess up to 30 days of data, set your retention in Kafka to 30 days.
  2. When you want to do the reprocessing, start a second instance of your stream processing job that starts processing from the beginning of the retained data, but direct this output data to a new output table.
  3. When the second job has caught up, switch the application to read from the new table.
  4. Stop the old version of the job, and delete the old output table.

This architecture looks something like this:
Kappa

Unlike the Lambda Architecture, in this approach you only do reprocessing when your processing code changes, and you actually need to recompute your results. And, of course, the job doing the re-computation is just an improved version of the same code, running on the same framework, taking the same input data. Naturally, you will want to bump up the parallelism on your reprocessing job so it completes very quickly.

Maybe we could call this the Kappa Architecture, though it may be too simple of an idea to merit a Greek letter.

Of course, you can optimize this further. In many cases, you could combine the two output tables. However, I think there are some benefits to having both for a short period of time. This allows you to revert back instantaneously to the old logic by just having a button that redirects the application to the old table. And in cases that are particularly important (your ad targeting criteria, say), you can control the cut-over with an automatic A/B test or bandit algorithm to ensure whatever bug fix or code improvement you are rolling out hasn’t accidentally degraded things in comparison to the prior version.

Note that this this doesn’t mean your data can’t go to HDFS; it just means that you don’t run your reprocessing there. Kafka has good integration with Hadoop, so mirroring any Kafka topic into HDFS is easy. It is often useful for the output or even intermediate streams from a stream processing job to be available in Hadoop for analysis in tools like Hive or for use as input for other, offline data processing flows.

We have documented implementing this approach as well as other variations on reprocessing architectures using Samza.

Some background

For those less familiar with Kafka, what I just described may not make sense. A quick refresher will hopefully straighten things out. Kafka maintains ordered logs like this: Kafka_log A Kafka “topic” is a collection of these logs: partitioned_log A stream processor consuming this data just maintains an “offset,” which is the log entry number for the last record it has processed on each of these partitions. So, changing the consumer’s position to go back and reprocess data is as simple as restarting the job with a different offset. Adding a second consumer for the same data is just another reader pointing to a different position in the log.

Kafka supports replication and fault-tolerance, runs on cheap, commodity hardware, and is glad to store many TBs of data per machine. So, retaining large amounts of data is a perfectly natural and economical thing to do and won’t hurt performance. LinkedIn keeps more than a petabyte of Kafka storage online, and a number of applications make good use of this long retention pattern for exactly this purpose.

Cheap consumers and the ability to retain large amounts of data make adding the second “reprocessing” job just a matter of firing up a second instance of your code but starting from a different position in the log.

This design is not an accident. We built Kafka with the intent of using it as a substrate for stream processing, and we had in mind exactly this model for handling reprocessing data. For the curious, you can find more information on Kafka here.

Fundamentally, though, there is nothing that ties this idea to Kafka. You could substitute any system that supports long retention of ordered data (for example HDFS, or some kind of database). Indeed, a lot of people are familiar with similar patterns that go by the name Event Sourcing or CQRS. And, of course, the distributed database people will tell you this is just a slight rebranding of materialized view maintenance, which, as they will gladly remind you, they figured out a long long time ago, sonny.

Comparison

I know this approach works well using Samza as the stream processing system because we do it at LinkedIn. But I am not aware of any reason it shouldn’t work equally well in Storm or other stream processing systems. I’m not familiar enough with Storm to work through the practicalities, so I’d be glad to hear if others are doing this already. In any case, I think the general ideas are fairly system independent.

The efficiency and resource trade-offs between the two approaches are somewhat of a wash. The Lambda Architecture requires running both reprocessing and live processing all the time, whereas what I have proposed only requires running the second copy of the job when you need reprocessing. However, my proposal requires temporarily having 2x the storage space in the output database and requires a database that supports high-volume writes for the re-load. In both cases, the extra load of the reprocessing would likely average out. If you had many such jobs, they wouldn’t all reprocess at once, so on a shared cluster with several dozen such jobs you might budget an extra few percent of capacity for the few jobs that would be actively reprocessing at any given time.

The real advantage isn’t about efficiency at all, but rather about allowing people to develop, test, debug, and operate their systems on top of a single processing framework. So, in cases where simplicity is important, consider this approach as an alternative to the Lambda Architecture.

Photo used on home and category pages by Markus Grossalber on Flickr, used under a Creative Commons license.

tags: , ,

Get the O’Reilly Data Newsletter

Stay informed. Receive weekly insight from industry insiders.

  • http://mhausenblas.info/ Michael Hausenblas

    Thanks a lot for the write-up, Jay! I would be more than happy having a summary of this as a contribution to http://lambda-architecture.net/ — what do you think?

    Background: IMHO we need many more people like you who share their experience in building large-scale distributed (production) systems and reflect on them on an architectural level. One thing not to forget: in my experience there are a lot of people out there who are not as advanced users of the Hadoop ecosystem as you and your team (or Nathan Marz et al) are. We all, as a community of practitioners, benefit from discussing different architectures, their pros and cons and learning from each other as we build these systems. KUTGW!

    Cheers,
    Michael

  • lmm

    I think the lambda architecture is intended for the case where you want to deliberately allow the stream-processing side to run different code, for performance reasons. Perhaps you want it to sample rather than processing all entries. Perhaps you can’t achieve the performance and availability that you want for your homepage with a fully-consistent datastore, but you want historical data to be correct and consistent.

    But yeah, this is a good overview of some of the tradeoffs; for many use cases, simply using a streaming system and “replaying” it when you want to deploy new code gets you the same accuracy of results for less complexity than full lambda architecture.

    (Fun fact: my first company was doing this in 2009, on MySQL and Java. You don’t need the latest technologies for this architecture to be useful, though they can certainly make it easier)

  • Paul Ingles

    Very interesting. The stream reprocessing architecture is exactly what we’ve ended up building to simplify our pipeline. Even though we’ve been using Kafka for a while the majority of our consumption/processing was batch. But, these became ever slower and complex and the processing code became intractable from the framework libs we used.

    Given our input data is time-ordered, immutable and stored forever (we archive to S3 from Kafka periodically) we built our system to just take events in- normally it just tracks live but if we make changes that require us to re-derive the output we trigger the reprocessing of the transaction log.

    The result is we have a much smaller, simpler codebase that requires far fewer frameworks to get stuff done. We can increase parallelisation by starting more instances of the app on EC2 and re-build data in a few hours.

  • http://people.apache.org/~hemapani/ Srinath Perera

    I also think one big use case of Lambda architecture is when you run two set of logic on realtime and batch layers. For example, I might run logic to detect suspicious activities in the realtime layer while at the batch layer I would run a more complicated algorithm that detect frauds (doing multiple passes over the data).

    Thus get more interesting when realtime layer choose to use results from the batch layer, that combines both models. For example, real time layer use the model calculated by batch layers to detect fraud.

    I think in those cases, Lambda architecture is useful.

  • http://www.michael-noll.com/ Michael G. Noll

    Thanks for sharing your thoughts, Jay! I enjoyed the read.

    I just have one additional comment to your alternative suggestion: You can also replay historical from Hadoop/HDFS back into Kafka, and then perform the reprocessing you mentioned. That is, you don’t necessarily need to keep your (potentially big) sliding window of relevant data exclusively in Kafka or a similar messaging system.

    Maybe you implied this in your article, though from what I understand you talked about Hadoop primarily as a one-directional sink of Kafka data (Kafka -> HDFS). What I tried to highlight in my comment above is that one can also leverage an existing investment into Hadoop to archive historical data, and then use it to feed such archived data back into Kafka whenever it’s needed (HDFS -> Kafka). Depending on the data volume and the required retention period, this helps for example to lower the footprint of your Kafka infrastructure. It might even be required in general if you often need to replay “really old” data back into Kafka for (say) testing purposes.

    Again, thanks for sharing your thoughts and notably your practical experiences at LinkedIn.

  • Sean McNamara

    Use Apache Spark. It allows for both batch and stream processing using a single API.

  • D Cohen

    I’m surprised that you did not mention Apache Spark and Google Dataflow architectures.

    How do you see Apache Spark solving this problem? Spark claims that it solves the problem of batch/streaming code sharing, what;s your view on this?

    Also, how about Google’s new Dataflow architecture, which is based on FlumeJava and MillWheel? They also support code sharing.

  • rad_g

    Great write up. I was always wondering why would one want to maintain two versions of the code for two completely different systems without having the ability to code share.
    I can only speak from the perspective of Storm. What I’m going to say here relates to some comments above. There’s nothing stopping one to run different topologies having different level of data introspection / analysis.
    I think the problem of Storm is that it was immediately positioned as real-time stream processing system. In fact, as you mention, Storm is perfectly usable for data reprocessing.
    Now Spark, I can only say from the observer perspective. Entry level to Storm is much lower than Spark. Maybe I’m being ignorant, but it is also possible to write short sliding window, ad-hoc, job processing engine with Storm to achieve the same results as with Spark. Those two do the same thing, essentially.
    I like your “stupidly simple” alternative. I’d add one more thing to this architecture. The input for Storm is Kafka. Let Kafka fill up disks on instances, do not delete anything from it. When storage is running out, “extend” the partition by redirecting it into another instance. Snapshot the drives, terminate full instances. Not sure if this is possible today but one can imagine some Zookeeper magic could make it possible. The result could be potentially indefinite length queue. What do you think?

    • http://stackoverflow.com/users/298389/om-nom-nom Kostya Golikov

      Entry level to Storm is much lower than Spark. Maybe I’m being ignorant, but it is also possible to write short sliding window, ad-hoc, job processing engine with Storm to achieve the same results as with Spark.

      I beg to differ! Some of the things that are important for entry-level:
      * spark concepts are somewhat simpler (it’s opinionated, yes)
      * spark source code is easier to dive through for an average person
      * spark has REPL
      * and finally, spark embraces types.

      (having said this, we’re using Storm mainly)

  • http://nvquanghuy.com Huy Nguyen

    Interesting observations. That said, could you explain more on why maintaining code that perform transformations on both batch-tier and stream-tier are painful? Couldn’t it be abstracted into a transform-as-a-service API to be called from?

    We’ve read both the Lamda Architecture and your Log blogpost and draw a great number of inspirations from both.

    We’re improving our data pipeline and reprocessing ability is something we’re working on at the moment.

    When we looked into Kafka (or any real-time log-like system in that manner) as a log-like layer of storage for (re)processing, we ended up choosing S3 as our log-like storage instead of Kafka because:

    1/ When it comes to batch (re)processing, we do it in a time-period manner (eg. process 1 hour of data). Kafka use natural number to order message, thus we’ll have to build another service to translate [beg_timestamp, end_timestamp) into [beg_index, end_index). This is not difficult but just extra work.

    2/ Kafka can only retain up to X days due to the disk-space limitation. As we want the ability to reprocess data further back, employing Kafka means we need another strategy to cater to these cases.

    Data are stored in hour bucket in S3, so retrieving them hour by hour is simple, we just have a process that downloads 1 hour of data from S3, run them through a series of transformations, then perform a insert-overwrite into our Hadoop cluster.

    • http://stackoverflow.com/users/298389/om-nom-nom Kostya Golikov

      we’ll have to build another service to translate [beg_timestamp, end_timestamp) into [beg_index, end_index).

      I see this pattern over and over in services based off kafka :-)

  • Rubén Casado

    Really nice article! Thanks for sharing your thoughts and experience!

    In my opinion both approaches (Lambda and Kappa) are interesting and useful in different scenarios.

    Due to Kakfa is not a good option to store data for long periods, Lambda approach is more suitable for scenarios where you need to take into account the whole historical dataset for getting the results. In the same way if thed data acquisition frecuency of your system is high, to use an incremental strategy (as suggested by the Kappa architecture) could end up in a bottleneck.

    But in case either your acquisition frecuency is not a problem or you don’t need the whole data set to obtain the results, Kappa architecture is more interesting and easy to develop/mantain.

    Regarding the development of business application logic in two different systems (e.g. Hadoop and Storm), there are other alternatives. Summingbird is interesting but you have to execute exactly the same logic in batch and streaming, something that it not always the right solution. Spark provides extended MapReduce-like logic for batch and Spark Streaming (similiar programing model) for the streaming. But a better alternative is Lambdoop which has already integrated in the framework the combination of the results (hibryd computation) including syncronization and aggregation issues.

  • Quinton Anderson

    I think the danger with any architectural pattern is that people assume that it can simply be applied in their case, and this is almost never the case. There are cases where your proposed alternative should be used, and there are other cases where Lambda should be used, and indeed some combination of the above. There are also many cases where these should combined with an actor system or BSP, which can then be served in various ways.

    The key issue is realising the trade offs, understanding your problem domain and the nature of your data. I think you have done a good job in identifying some other approaches and listing some of the trade offs, but as with any architecture pattern, it isn’t right or wrong.

    Thanks for the article.

  • Fady Bekatcho
  • Javi Roman

    The Srinath Perera comment fit fully with our experience deploying a Lambda Architecture in the banking field. Most of the practical use of Lambda Architecture, from our point of view, means to maintain different developer teams, one team for batch data stored analysis (complex machine learning and historical pattern prediction), and other team for develop real time analysis (on-line alerts, and real-time decisions). The speed layer often use the batch layer data analysis for their decisions.

  • shankar ganesh pj

    Can we replace storm by kinesis? also can we have hadoop running on openstack (sahara)?

    Thanks,
    Shankar.

    • danosipov

      Kinesis is in the same class as Kafka.

      You have to be careful about running Hadoop on Openstack. Its easy to create a cluster that underperforms because it runs on shared resources.

      • shankar ganesh pj

        Thanks for info.

  • Eran Chinthaka Withana

    As usual another good write up Jay. We actually had a similar requirement where we often needed to either calculate new fields or re-calculate older fields to make corrections. What we ended up doing was creating special one-off storm topologies to process all events in a Kafka queue by replaying those. Since Kafka was scalable and we were writing to Cassandra, we had the opportunity to scale the topology enough to suit our latency requirements. The only difference from what you have is we were writing the new values to the same tables in Cassandra. We had to run special compaction and repair processes in Cassandra nodes after this run but we were in control throughout the whole operation.
    Also, we had certain storm topologies running at a very lower speed (higher latency) while the other were running at a higher speed (lower latency). This helped us to do away with a separate batch-processing layer and use storm itself for that by changing the worker parallelism.

  • http://www.noaddiction-powder.in Naina Katyal

    Very informative post thanks for share this with us i highly appreciate you for this information thanks once again for sharing information like this!
    Air sofa cum bed

  • Patrick Wendell

    Hey Jay,

    This is a great topic. I can’t help but chime in and mention Spark Streaming here. Combining the same API/System for both streaming and offline batch processing the primary design motivation for Spark Streaming. This has been the goal of the project for the last few years as it’s been developed. Many people use it in tandem with Kafka in exactly the way you are describing here to avoid having to synchronize two parallel architectures.

    It would be worth mentioning next time you do a survey piece like this – can’t promise I won’t steal “Kappa Architecture” (with attribution of course) :P

    - Patrick