Hadoop 101: Simplifying MapReduce Development

Print Friendly, PDF & Email

hadoop-101Learning a new development framework takes time, and, as is well known, the Hadoop platform is no exception. MapReduce developers face a steep learning curve when first deploying and configuring a Hadoop cluster and later when verifying program correctness. Compounded by long execution times (measured in minutes), this often frustrates data scientists, especially those who lack a systems administration background. Even experienced Hadoop users run into a well-known set of “gotchas” that hinder their progress.

Recognizing that these obstacles easily can create frustration and impede productivity, ScaleOut Software has focused on making it as easy as possible for developers to build high performance, distributed applications for the enterprise. ScaleOut hServer®, which provides an in-memory MapReduce execution engine, was specifically designed to overcome Hadoop’s complexity and make the entire development cycle as straightforward as possible, while also delivering real-time performance. This article explores some typical configuration roadblocks found in traditional Hadoop platforms and explains how they can be sidestepped using ScaleOut hServer using the WordCount application to illustrate these benefits.

Some Configuration and Development Obstacles in Hadoop

Before starting MapReduce development, Hadoop users must configure and deploy the Hadoop Distributed File System (HDFS) and the MapReduce infrastructure. Next, they have to tune Hadoop’s framework-specific configuration parameters, such as the number of task slots on each participating host, the memory allowance for MapReduce tasks, the size of temporary buffer files, and whether to use compression for intermediate data.

Once the user begins developing MapReduce applications, another set of challenges emerge. Users have to make several design choices which materially impact performance, such as determining the number of splits and partitions used by the application. In addition, verifying output correctness can be difficult and tedious since MapReduce applications usually output unstructured text files which may have to be searched for a specific value – a task that often requires yet another MapReduce application.

What is ScaleOut hServer?

ScaleOut hServer provides a MapReduce execution engine that can run standard MapReduce applications with a single-line code change (and zero code changes under YARN). It allows these applications to input and output either live, memory-based data or static data from HDFS. By eliminating Hadoop’s batch scheduling and using the grid for storing intermediate data with aggressive combining, it dramatically lowers MapReduce execution times.

ScaleOut hServer runs as an in-memory execution engine on top of the infrastructure of ScaleOut StateServer (SOSS), an in-memory data grid with a ten-year track record of handling mission-critical workloads. In-memory data grids host fast-changing data in memory distributed across a cluster of servers (called nodes). This enables fast access and dynamic updates, as well as data-parallel analysis with much lower latency than disk-based, batch-scheduled platforms, such as Hadoop. Spark embodies a similar, in-memory execution architecture that accelerates execution but, unlike an in-memory data grid, was not designed to provide high availability for production environments.

Avoiding Complexity with ScaleOut hServer

SOSS was designed from the ground up with the goal of making deployment and configuration of an in-memory data grid as simple as possible. Even for a new user, installing a multi-node cluster takes little time (on the order of minutes) since the nodes automatically aggregate and distribute the workload. When a node is added to the cluster to handle a larger workload, SOSS dynamically rebalances the workload to scale performance. It also employs built-in mechanisms for high availability so that it can detect and remove a failed node without disrupting processing.

ScaleOut hServer extends the ease of use baked into SOSS by eliminating the need for performance tuning when accessing memory-based data sets. It automatically sets the applicable Hadoop configuration parameters (e.g., the number of slots, splits, and partitions). Furthermore, directing a MapReduce application’s output to an in-memory collection of key/value pairs makes it straightforward to access and verify results, as illustrated below.

ScaleOut hServer can be downloaded from the ScaleOut Software website and installed on a laptop for development purposes or on a multi-server cluster for production use. Its small footprint and fast execution time makes MapReduce development straightforward and efficient.

Example: Porting WordCount to ScaleOut hServer

To demonstrate ScaleOut hServer’s ease of use, let’s take a look at how to port and run the Hadoop WordCount application – the “Hello world!” of Hadoop MapReduce applications and often the first example in Hadoop tutorials. This application’s goal is simple: process input file(s) and count the occurrences of each word. Although the application’s input can be very large, it is a good fit for in-memory execution using ScaleOut hServer because the amount of intermediate data flowing from mappers to reducers is limited by the total number of possible words and usually fits within the memory of an in-memory data grid.

Here’s a quick review of this application. The mappers process a space-delimited lines of text, converting each line into individual words and emitting key-value pairs in the form: <(word),1>. For each unique word, the reducers process the list of key-value pairs from the map phase, summing the occurrences and emitting key-value pairs of the form <(word),N> (where N is the occurrences of the word). The source code for WordCount is published in the Apache Hadoop tutorial.

With a single-line code change to the original WordCount sample, ScaleOut hServer’s MapReduce execution engine can execute this MapReduce job. It only requires that instantiation of the standard Hadoop Job class be replaced with HServerJob. This allows ScaleOut hServer’s client libraries to redirect the job to its in-memory compute engine. The code change is shown below.

Before:

Job job = new Job(conf, “wordcount”);

After:

Job job = new HServerJob(conf, “wordcount”);

With this change the application can be run from the command line as a standard Java application:

java -cp “./wordcount.jar:/usr/local/soss/java_api/*:/usr/local/soss/java_api/lib/*:/usr/local/soss/java_api/hslib/hadoop-2.4.1/*” org.myorg.WordCount in.txt out/

Note that all necessary library JARs shown above are located in ScaleOut hServer’s installation directory under the java_api folder, with third-party libraries in the lib subfolder and Hadoop distribution-specific libraries in the hslib subfolder. ScaleOut hServer automatically ships application code and libraries to all nodes in the cluster and runs the distributed MapReduce Job without any further configuration of either the application or cluster.

Running this command generates the following output:

[demo@demo8-centos6 ~]$ java -cp “./wordcount.jar:/usr/local/soss/java_api/*:/usr/local/soss/java_api/lib/*:/usr/local/soss/java_api/hslib/hadoop-2.4.1/*” org.myorg.WordCount in.txt out/

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/usr/local/soss5/java_api/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/local/soss5/java_api/hslib/hadoop-2.4.1/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

Splits created:

Host /10.0.3.28 has 1 splits.

Job initialization completed in 558 ms.

Map invocation done in 559 ms.

Reduce invocation done in 92 ms.

From start to finish, the sample job with a small input data set completes in about a second. The same job would take Hadoop up to a minute to run, with most of its time spent in JVM startup and disk I/O. ScaleOut StateServer optionally can maintain the application’s execution environment on the nodes, making subsequent runs of the same job (or other jobs with the same dependencies) even faster – typically returning results in milliseconds.

Simplifying Program Verification

ScaleOut hServer lets applications store output results within its in-memory data grid, which provides a highly convenient place to access output data either for debugging, reporting, or validating application correctness. Instead of post-processing an HDFS file (possibly with another MapReduce job), output data can be retrieved from the in-memory data grid using simple, key-based lookup.

ScaleOut hServer provides in-memory data storage for key/value pairs using an API called the NamedMap, which is a distributed implementation of the Java ConcurrentMap. Each instance of a NamedMap holds a collection of key/value pairs distributed across all cluster nodes and accessible with straightforward CRUD (Create/Read/Update/Delete) operations.

MapReduce applications direct output data to a NamedMap using ScaleOut hServer’s GridOutputFormat class, an implementation of Hadoop’s OutputFormat. The output for the WordCount sample is a key-value map of words (Text objects) associated with the count of each word (IntWritable objects). As shown below, the application substitutes the GridOutputFormat for the FileOutputFormat used with HDFS:

Before:

conf.setOutputFormat(FileOutputFormat);

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

After:

// First, create the NamedMap (named “wordcount_output”).

NamedMap<text, intwritable=””> outputMap = NamedMapFactory.getMap(“wordcount_output”, new WritableSerializer(Text.class), new WritableSerializer(IntWritable.class));

// Tell the job to use ScaleOut hServer’s GridOutputFormat output formatter.

conf.setOutputFormat(GridOutputFormat.class);

// Set the newly-created outputMap NamedMap as the job output destination.

GridOutputFormat.setNamedMap(conf, outputMap);

After running the WordCount application, the output stored in the NamedMap can be retrieved by using a simple key/value lookup:

System.out.println(“Word ‘the’ occurred ” + outputMap.get(new Text(“the”)) + ” times.”);

The NamedMap also can be used as an input for a MapReduce jobs via the NamedMapInputFormat, allowing MapReduce jobs can be chained together to run entirely in-memory without ever touching the disk. This removes a major bottleneck to performance and enables MapReduce applications to input live data.

Summing Up

ScaleOut hServer’s in-memory execution engine for MapReduce applications opens the door to real-time analytics with its ability to analyze live data and its fast execution time for memory-based data sets. It also offers a highly streamlined development platform, eliminating much of Hadoop’s well known complexity and enabling fast development cycles. In fact, ScaleOut hServer can be installed on a laptop and run MapReduce jobs in under a second.

Porting MapReduce applications to run on ScaleOut hServer requires only a one-line code change (or no change at all under YARN), and applications easily can input from an in-memory NamedMap and output to a NamedMap instead of HDFS. This eliminates the need to set and tune Hadoop configuration parameters, such as splits and partitions, while delivering maximum performance at all times. With ScaleOut hServer, MapReduce development has never been easier.

Michael Sobolev, Aaron Burke, Mark Waterman, and William Bain – ScaleOut Software

 

Sign up for the free insideBIGDATA newsletter.

 

Speak Your Mind

*