The SMAQ stack for big data

Storage, MapReduce and Query are ushering in data-driven products and services.

“Big data” is data that becomes large enough that it cannot be processed using conventional methods. Creators of web search engines were among the first to confront this problem. Today, social networks, mobile phones, sensors and science contribute to petabytes of data created daily.

To meet the challenge of processing such large data sets, Google created MapReduce. Google’s work and Yahoo’s creation of the Hadoop MapReduce implementation has spawned an ecosystem of big data processing tools.

As MapReduce has grown in popularity, a stack for big data systems has emerged, comprising layers of Storage, MapReduce and Query (SMAQ). SMAQ systems are typically open source, distributed, and run on commodity hardware.

SMAQ Stack

In the same way the commodity LAMP stack of Linux, Apache, MySQL and PHP changed the landscape of web applications, SMAQ systems are bringing commodity big data processing to a broad audience. SMAQ systems underpin a new era of innovative data-driven products and services, in the same way that LAMP was a critical enabler for Web 2.0.

Though dominated by Hadoop-based architectures, SMAQ encompasses a variety of systems, including leading NoSQL databases. This paper describes the SMAQ stack and where today’s big data tools fit into the picture.


MapReduce

Created at Google in response to the problem of creating web search indexes, the MapReduce framework is the powerhouse behind most of today’s big data processing. The key innovation of MapReduce is the ability to take a query over a data set, divide it, and run it in parallel over many nodes. This distribution solves the issue of data too large to fit onto a single machine.

SMAQ Stack - MapReduce

To understand how MapReduce works, look at the two phases suggested by its name. In the map phase, input data is processed, item by item, and transformed into an intermediate data set. In the reduce phase, these intermediate results are reduced to a summarized data set, which is the desired end result.

MapReduce example

A simple example of MapReduce is the task of counting the number of unique words in a document. In the map phase, each word is identified and given the count of 1. In the reduce phase, the counts are added together for each word.

If that seems like an obscure way of doing a simple task, that’s because it is. In order for MapReduce to do its job, the map and reduce phases must obey certain constraints that allow the work to be parallelized. Translating queries into one or more MapReduce steps is not an intuitive process. Higher-level abstractions have been developed to ease this, discussed under Query below.

An important way in which MapReduce-based systems differ from conventional databases is that they process data in a batch-oriented fashion. Work must be queued for execution, and may take minutes or hours to process.

Using MapReduce to solve problems entails three distinct operations:

  • Loading the data — This operation is more properly called Extract, Transform, Load (ETL) in data warehousing terminology. Data must be extracted from its source, structured to make it ready for processing, and loaded into the storage layer for MapReduce to operate on it.
  • MapReduce — This phase will retrieve data from storage, process it, and return the results to the storage.
  • Extracting the result — Once processing is complete, for the result to be useful to humans, it must be retrieved from the storage and presented.

Many SMAQ systems have features designed to simplify the operation of each of these stages.

Hadoop MapReduce

Hadoop is the dominant open source MapReduce implementation. Funded by Yahoo, it emerged in 2006 and, according to its creator Doug Cutting, reached “web scale” capability in early 2008.

The Hadoop project is now hosted by Apache. It has grown into a large endeavor, with multiple subprojects that together comprise a full SMAQ stack.

Since it is implemented in Java, Hadoop’s MapReduce implementation is accessible from the Java programming language. Creating MapReduce jobs involves writing functions to encapsulate the map and reduce stages of the computation. The data to be processed must be loaded into the Hadoop Distributed Filesystem.

Taking the word-count example from above, a suitable map function might look like the following (taken from the Hadoop MapReduce documentation, the key operations shown in bold).

 public static class Map 	extends Mapper<LongWritable, Text, Text, IntWritable> {  	private final static IntWritable one = new IntWritable(1); 	private Text word = new Text();  	public void map(LongWritable key, Text value, Context context) 	     throws IOException, InterruptedException {  		String line = value.toString(); 		StringTokenizer tokenizer = new StringTokenizer(line); 		while (tokenizer.hasMoreTokens()) { 			word.set(tokenizer.nextToken()); 			context.write(word, one); 		} 	} } 

The corresponding reduce function sums the counts for each word.

 public static class Reduce 		extends Reducer<Text, IntWritable, Text, IntWritable> {  	public void reduce(Text key, Iterable<IntWritable> values, 		Context context) throws IOException, InterruptedException {  		int sum = 0; 		for (IntWritable val : values) { 			sum += val.get(); 		} 		context.write(key, new IntWritable(sum)); 	} } 

The process of running a MapReduce job with Hadoop involves the following steps:

  • Defining the MapReduce stages in a Java program
  • Loading the data into the filesystem
  • Submitting the job for execution
  • Retrieving the results from the filesystem

Run via the standalone Java API, Hadoop MapReduce jobs can be complex to create, and necessitate programmer involvement. A broad ecosystem has grown up around Hadoop to make the task of loading and processing data more straightforward.

Other implementations

MapReduce has been implemented in a variety of other programming languages and systems, a list of which may be found in Wikipedia’s entry for MapReduce. Notably, several NoSQL database systems have integrated MapReduce, and are described later in this paper.


Storage

MapReduce requires storage from which to fetch data and in which to store the results of the computation. The data expected by MapReduce is not relational data, as used by conventional databases. Instead, data is consumed in chunks, which are then divided among nodes and fed to the map phase as key-value pairs. This data does not require a schema, and may be unstructured. However, the data must be available in a distributed fashion, to serve each processing node.

SMAQ Stack - Storage

The design and features of the storage layer are important not just because of the interface with MapReduce, but also because they affect the ease with which data can be loaded and the results of computation extracted and searched.

Hadoop Distributed File System

The standard storage mechanism used by Hadoop is the Hadoop Distributed File System, HDFS. A core part of Hadoop, HDFS has the following features, as detailed in the HDFS design document.

  • Fault tolerance — Assuming that failure will happen allows HDFS to run on commodity hardware.
  • Streaming data access — HDFS is written with batch processing in mind, and emphasizes high throughput rather than random access to data.
  • Extreme scalability — HDFS will scale to petabytes; such an installation is in production use at Facebook.
  • Portability — HDFS is portable across operating systems.
  • Write once — By assuming a file will remain unchanged after it is written, HDFS simplifies replication and speeds up data throughput.
  • Locality of computation — Due to data volume, it is often much faster to move the program near to the data, and HDFS has features to facilitate this.

HDFS provides an interface similar to that of regular filesystems. Unlike a database, HDFS can only store and retrieve data, not index it. Simple random access to data is not possible. However, higher-level layers have been created to provide finer-grained functionality to Hadoop deployments, such as HBase.

HBase, the Hadoop Database

One approach to making HDFS more usable is HBase. Modeled after Google’s BigTable database, HBase is a column-oriented database designed to store massive amounts of data. It belongs to the NoSQL universe of databases, and is similar to Cassandra and Hypertable.

HBase and MapReduce

HBase uses HDFS as a storage system, and thus is capable of storing a large volume of data through fault-tolerant, distributed nodes. Like similar column-store databases, HBase provides REST and Thrift based API access.

Because it creates indexes, HBase offers fast, random access to its contents, though with simple queries. For complex operations, HBase acts as both a source and a sink (destination for computed data) for Hadoop MapReduce. HBase thus allows systems to interface with Hadoop as a database, rather than the lower level of HDFS.

Hive

Data warehousing, or storing data in such a way as to make reporting and analysis easier, is an important application area for SMAQ systems. Developed originally at Facebook, Hive is a data warehouse framework built on top of Hadoop. Similar to HBase, Hive provides a table-based abstraction over HDFS and makes it easy to load structured data. In contrast to HBase, Hive can only run MapReduce jobs and is suited for batch data analysis. Hive provides a SQL-like query language to execute MapReduce jobs, described in the Query section below.

Cassandra and Hypertable

Cassandra and Hypertable are both scalable column-store databases that follow the pattern of BigTable, similar to HBase.

An Apache project, Cassandra originated at Facebook and is now in production in many large-scale websites, including Twitter, Facebook, Reddit and Digg. Hypertable was created at Zvents and spun out as an open source project.

Cassandra and MapReduce

Both databases offer interfaces to the Hadoop API that allow them to act as a source and a sink for MapReduce. At a higher level, Cassandra offers integration with the Pig query language (see the Query section below), and Hypertable has been integrated with Hive.

NoSQL database implementations of MapReduce

The storage solutions examined so far have all depended on Hadoop for MapReduce. Other NoSQL databases have built-in MapReduce features that allow computation to be parallelized over their data stores. In contrast with the multi-component SMAQ architectures of Hadoop-based systems, they offer a self-contained system comprising storage, MapReduce and query all in one.

Whereas Hadoop-based systems are most often used for batch-oriented analytical purposes, the usual function of NoSQL stores is to back live applications. The MapReduce functionality in these databases tends to be a secondary feature, augmenting other primary query mechanisms. Riak, for example, has a default timeout of 60 seconds on a MapReduce job, in contrast to the expectation of Hadoop that such a process may run for minutes or hours.

These prominent NoSQL databases contain MapReduce functionality:

  • CouchDB is a distributed database, offering semi-structured document-based storage. Its key features include strong replication support and the ability to make distributed updates. Queries in CouchDB are implemented using JavaScript to define the map and reduce phases of a MapReduce process.
  • MongoDB is very similar to CouchDB in nature, but with a stronger emphasis on performance, and less suitability for distributed updates, replication, and versioning. MongoDB MapReduce operations are specified using JavaScript.
  • Riak is another database similar to CouchDB and MongoDB, but places its emphasis on high availability. MapReduce operations in Riak may be specified with JavaScript or Erlang.

Integration with SQL databases

In many applications, the primary source of data is in a relational database using platforms such as MySQL or Oracle. MapReduce is typically used with this data in two ways:

  • Using relational data as a source (for example, a list of your friends in a social network).
  • Re-injecting the results of a MapReduce operation into the database (for example, a list of product recommendations based on friends’ interests).

It is therefore important to understand how MapReduce can interface with relational database systems. At the most basic level, delimited text files serve as an import and export format between relational databases and Hadoop systems, using a combination of SQL export commands and HDFS operations. More sophisticated tools do, however, exist.

The Sqoop tool is designed to import data from relational databases into Hadoop. It was developed by Cloudera, an enterprise-focused distributor of Hadoop platforms. Sqoop is database-agnostic, as it uses the Java JDBC database API. Tables can be imported either wholesale, or using queries to restrict the data import.

Sqoop also offers the ability to re-inject the results of MapReduce from HDFS back into a relational database. As HDFS is a filesystem, Sqoop expects delimited text files and transforms them into the SQL commands required to insert data into the database.

For Hadoop systems that utilize the Cascading API (see the Query section below) the cascading.jdbc and cascading-dbmigrate tools offer similar source and sink functionality.

Integration with streaming data sources

In addition to relational data sources, streaming data sources, such as web server log files or sensor output, constitute the most common source of input to big data systems. The Cloudera Flume project aims at providing convenient integration between Hadoop and streaming data sources. Flume aggregates data from both network and file sources, spread over a cluster of machines, and continuously pipes these into HDFS. The Scribe server, developed at Facebook, also offers similar functionality.

Commercial SMAQ solutions

Several massively parallel processing (MPP) database products have MapReduce functionality built in. MPP databases have a distributed architecture with independent nodes that run in parallel. Their primary application is in data warehousing and analytics, and they are commonly accessed using SQL.

  • The Greenplum database is based on the open source PostreSQL DBMS, and runs on clusters of distributed hardware. The addition of MapReduce to the regular SQL interface enables fast, large-scale analytics over Greenplum databases, reducing query times by several orders of magnitude. Greenplum MapReduce permits the mixing of external data sources with the database storage. MapReduce operations can be expressed as functions in Perl or Python.
  • Aster Data’s nCluster data warehouse system also offers MapReduce functionality. MapReduce operations are invoked using Aster Data’s SQL-MapReduce technology. SQL-MapReduce enables the intermingling of SQL queries with MapReduce jobs defined using code, which may be written in languages including C#, C++, Java, R or Python.

Other data warehousing solutions have opted to provide connectors with Hadoop, rather than integrating their own MapReduce functionality.

  • Vertica, famously used by Farmville creator Zynga, is an MPP column-oriented database that offers a connector for Hadoop.
  • Netezza is an established manufacturer of hardware data warehousing and analytical appliances. Recently acquired by IBM, Netezza is working with Hadoop distributor Cloudera to enhance the interoperation between their appliances and Hadoop. While it solves similar problems, Netezza falls outside of our SMAQ definition, lacking both the open source and commodity hardware aspects.

Although creating a Hadoop-based system can be done entirely with open source, it requires some effort to integrate such a system. Cloudera aims to make Hadoop enterprise-ready, and has created a unified Hadoop distribution in its Cloudera Distribution for Hadoop (CDH). CDH for Hadoop parallels the work of Red Hat or Ubuntu in creating Linux distributions. CDH comes in both a free edition and an Enterprise edition with additional proprietary components and support. CDH is an integrated and polished SMAQ environment, complete with user interfaces for operation and query. Cloudera’s work has resulted in some significant contributions to the Hadoop open source ecosystem.


Query

Specifying MapReduce jobs in terms of defining distinct map and reduce functions in a programming language is unintuitive and inconvenient, as is evident from the Java code listings shown above. To mitigate this, SMAQ systems incorporate a higher-level query layer to simplify both the specification of the MapReduce operations and the retrieval of the result.

SMAQ Stack - Query

Many organizations using Hadoop will have already written in-house layers on top of the MapReduce API to make its operation more convenient. Several of these have emerged either as open source projects or commercial products.

Query layers typically offer features that handle not only the specification of the computation, but the loading and saving of data and the orchestration of the processing on the MapReduce cluster. Search technology is often used to implement the final step in presenting the computed result back to the user.

Pig

Developed by Yahoo and now part of the Hadoop project, Pig provides a new high-level language, Pig Latin, for describing and running Hadoop MapReduce jobs. It is intended to make Hadoop accessible for developers familiar with data manipulation using SQL, and provides an interactive interface as well as a Java API. Pig integration is available for the Cassandra and HBase databases.

Below is shown the word-count example in Pig, including both the data loading and storing phases (the notation $0 refers to the first field in a record).

 input = LOAD 'input/sentences.txt' USING TextLoader(); words = FOREACH input GENERATE FLATTEN(TOKENIZE($0)); grouped = GROUP words BY $0; counts = FOREACH grouped GENERATE group, COUNT(words); ordered = ORDER counts BY $0; STORE ordered INTO 'output/wordCount' USING PigStorage(); 

While Pig is very expressive, it is possible for developers to write custom steps in User Defined Functions (UDFs), in the same way that many SQL databases support the addition of custom functions. These UDFs are written in Java against the Pig API.

Though much simpler to understand and use than the MapReduce API, Pig suffers from the drawback of being yet another language to learn. It is SQL-like in some ways, but it is sufficiently different from SQL that it is difficult for users familiar with SQL to reuse their knowledge.

Hive

As introduced above, Hive is an open source data warehousing solution built on top of Hadoop. Created by Facebook, it offers a query language very similar to SQL, as well as a web interface that offers simple query-building functionality. As such, it is suited for non-developer users, who may have some familiarity with SQL.

Hive’s particular strength is in offering ad-hoc querying of data, in contrast to the compilation requirement of Pig and Cascading. Hive is a natural starting point for more full-featured business intelligence systems, which offer a user-friendly interface for non-technical users.

The Cloudera Distribution for Hadoop integrates Hive, and provides a higher-level user interface through the HUE project, enabling users to submit queries and monitor the execution of Hadoop jobs.

Cascading, the API Approach

The Cascading project provides a wrapper around Hadoop’s MapReduce API to make it more convenient to use from Java applications. It is an intentionally thin layer that makes the integration of MapReduce into a larger system more convenient. Cascading’s features include:

  • A data processing API that aids the simple definition of MapReduce jobs.
  • An API that controls the execution of MapReduce jobs on a Hadoop cluster.
  • Access via JVM-based scripting languages such as Jython, Groovy, or JRuby.
  • Integration with data sources other than HDFS, including Amazon S3 and web servers.
  • Validation mechanisms to enable the testing of MapReduce processes.

Cascading’s key feature is that it lets developers assemble MapReduce operations as a flow, joining together a selection of “pipes”. It is well suited for integrating Hadoop into a larger system within an organization.

While Cascading itself doesn’t provide a higher-level query language, a derivative open source project called Cascalog does just that. Using the Clojure JVM language, Cascalog implements a query language similar to that of Datalog. Though powerful and expressive, Cascalog is likely to remain a niche query language, as it offers neither the ready familiarity of Hive’s SQL-like approach nor Pig’s procedural expression. The listing below shows the word-count example in Cascalog: it is significantly terser, if less transparent.

 	(defmapcatop split [sentence] 		(seq (.split sentence "s+")))  	(?<- (stdout) [?word ?count] 		(sentence ?s) (split ?s :> ?word) 		(c/count ?count)) 

Search with Solr

An important component of large-scale data deployments is retrieving and summarizing data. The addition of database layers such as HBase provides easier access to data, but does not provide sophisticated search capabilities.

To solve the search problem, the open source search and indexing platform Solr is often used alongside NoSQL database systems. Solr uses Lucene search technology to provide a self-contained search server product.

For example, consider a social network database where MapReduce is used to compute the influencing power of each person, according to some suitable metric. This ranking would then be reinjected to the database. Using Solr indexing allows operations on the social network, such as finding the most influential people whose interest profiles mention mobile phones, for instance.

Originally developed at CNET and now an Apache project, Solr has evolved from being just a text search engine to supporting faceted navigation and results clustering. Additionally, Solr can manage large data volumes over distributed servers. This makes it an ideal solution for result retrieval over big data sets, and a useful component for constructing business intelligence dashboards.


Conclusion

MapReduce, and Hadoop in particular, offers a powerful means of distributing computation among commodity servers. Combined with distributed storage and increasingly user-friendly query mechanisms, the resulting SMAQ architecture brings big data processing within reach for even small- and solo-development teams.

It is now economic to conduct extensive investigation into data, or create data products that rely on complex computations. The resulting explosion in capability has forever altered the landscape of analytics and data warehousing systems, lowering the bar to entry and fostering a new generation of products, services and organizational attitudes – a trend explored more broadly in Mike Loukides’ “What is Data Science?” report.

The emergence of Linux gave power to the innovative developer with merely a small Linux server at their desk: SMAQ has the same potential to streamline data centers, foster innovation at the edges of an organization, and enable new startups to cheaply create data-driven businesses.

O’Reilly events and publications related to SMAQ:

tags: , , , , , , , ,