A new series of execution frameworks have been added to the Hadoop ecosystem like Spark, Tez, Storm and Samza, each with its own strengths and weaknesses. As such, it’s increasingly becoming a challenge to choose the best one for the problems you need to solve.
Traditional MapReduce is efficient and scales well for big data batch processing use cases, but does not perform well for small data or iterative calculations. Spark provides a nice API and is very fast for small-to-intermediate data use cases, has lower latency and deals better with iterative algorithms, but its processing capabilities are limited by available hardware (all intermediates have to fit in-memory to get the necessary performance). You might argue that hardware is cheap nowadays, but as you accrue more data, buying more hardware is only a band-aid approach and will not solve the problem. Also, as more users run jobs on a cluster, they have to compete for available resources.
There are also some overlapping strengths and weaknesses, making the decision of which execution framework to use even more confusing. For example, Spark streaming, Storm and Samza all deal well with streaming use cases, but which one is best suited for your needs? You can certainly compare them in terms of latency, performance and feature set, but you can only base your decisions on what is available today. Tomorrow, there will be more frameworks added to the ecosystem and existing ones will have evolved. As you prepare for the future, you should consider:
— How the data you have to analyze will change
— What future use cases you will have to implement
— Which execution engines (and with what feature sets and capabilities) will be available
Given this, you have to be prepared to switch execution frameworks in the future even though you may have made well-informed decisions today.
Datameer Integration of Execution Frameworks
Datameer traditionally compiled all workflows into MapReduce jobs because it was the best (and only) execution framework that scaled well with growing data volumes. Now that more execution frameworks are available, we decided to not just switch over to another framework, but to instead support multiple frameworks. To make this possible, we have created an internal plug-in extension point called “ExecutionFramework.” The API of that plug-in extension point will not be considered for public release until the API is considered stable.
The following picture shows the architecture changes that we have made to support multiple execution frameworks.
Previous Architecture (Datameer 4.0)
Datameer 4 was compiling workflows directly to Hadoop MapReduce jobs. This approach was able to generate MapReduce jobs efficiently but included a few drawbacks, including:
— High-level operations like various joins, aggregations, time series and especially data mining algorithms had to be translated to MapReduce API directly, meaning a complex translation. MapReduce is a very low-level API, almost like an assembler language.
— Supporting other execution frameworks would have meant entirely rewriting the complex compile logic.
New Architecture (Datameer 5.0)
To eliminate these drawbacks, we changed Datameer’s architecture and introduced an abstraction of a processing flow that supports operations like map, flatMap, sort, union, etc. We call this abstraction layer, “New graph API” and the model that describes the operations is called JobFlow. The New graph API follows a few design principles:
— It does not use any Hadoop code
— It does not know about MapReduce
— It has enough general-purpose operations to make it easy to build JobFlows for complex processing logic
— It has only a few operations so it can easily add additional execution frameworks
Smart ExecutionTM: Best Performance Without Manual Interaction
As I mentioned, selecting the right execution engine can be challenging and you eventually need to re-evaluate your decisions whenever the data size, processing flow or technology changes. Smart Execution solves this problem by automatically picking the execution engine that is best suited for your workflow and data size.
You don’t have to manually tweak processing engines when your workflow or data volumes change. Smart Execution will always adopt the most appropriate processing engine.
This even works within a single workflow. Very often data sizes decrease significantly during a long chain of operations. As an example, this frequently occurs after applying filters and aggregations when all subsequent computations are applied to lower volumes of data. Smart Execution will be able to detect this and switch engines in the middle of processing. This is shown in action in the following diagram (part of Datameer 5.0 new job visualization features). The first two cluster jobs are executed as optimized MapReduce (orange) and the last one as a single node execution (green).
Supported Execution Frameworks
Datameer 5.0 comes with four execution frameworks:
Workflows are executed as plain, old MapReduce jobs. This works on MR1 and MR2 (YARN) Hadoop clusters.
Optimized MapReduce (Tez)
Workflows are compiled to Tez DAGs (directed acyclic graphs) and executed as such. Although the approach is similar to standard MapReduce, Tez has many features that MapReduce doesn’t have that significantly improve performance.
For example, Tez allows adjusting parallelism of reduce tasks at run time depending on the actual data size coming out of the previous task. This information is usually not available before the job is submitted to the cluster.
For standard MapReduce, the user has to decide on the number of reduce tasks before the job is submitted to the cluster and the only way to do that is by making a good guess. Tez can adjust to actual data sizes during run time and reduce resource utilization, which makes it easier to reuse resources even across DAGs.
This functionality requires a Hadoop 2.2-based YARN cluster (or higher).
Small Job Execution
Small job execution executes a workflow on a single node of the cluster instead of trying to distribute the work in parallel. This is very efficient for small data volumes since all overhead associated with distributed processing is entirely avoided.
For small data the costs to coordinate distributed tasks outweigh the actual processing time. Small job execution does all processing on one node in one JVM, allowing intermediates to be kept in-memory or on local hard disk instead of being exchanged via HDFS or the network. Small job execution is much more efficient and effective at resource utilization than other distributed processing approaches up to a certain threshold of data volume or processing complexity. It also scales well with cluster size since many small job executions can run concurrently. Lastly, it’s similar to MapReduce regarding failures – if the small job execution node dies, the task will be executed again on a different node.
This functionality requires a Hadoop 2.2-based YARN cluster (or higher).
Local Job Execution
Local job execution is similar to small job execution, but it does not run the job on the Hadoop cluster – instead it runs on the local machine.
Smart Execution Prerequisites
— Hadoop >=2.2 based YARN Hadoop distribution (e.g. CDH5, CDH5.1, HDP 2.0.6, HDP2.1). If you have not upgraded, yet, there is no need to worry. Datameer will continue to work and run all jobs as standard MapReduce.
— No additional software is required. Even Tez does not have to be installed on your cluster. Datameer is 100% self-contained.
Datameer has always smoothly integrated with all Hadoop distributions and available hardware and with any data volumes that need to be processed, and we want to continue this trend. We want to work on every Hadoop distribution, on all hardware configurations, from desktop to large Hadoop clusters, and efficiently process data, from kilobytes to petabytes. The computations also need be accurate and reliable in the face of hardware and software failures. Tez fulfills all of these requirements. It does not add any hardware requirements or upgrades – users can make use of available RAM. This way all customers can make use of our solution without initially buying new hardware. It also runs on all Hadoop 2.2-based distributions.
Why not Spark (yet)?
There is a lot of hype around Spark right now but with as many people that are talking about it, there is a lack of people that have it in production. As of now, it’s missing proof points that it can handle the needs for enterprise companies.
It will eventually integrate well with our new architecture, however, we have decided not to select Spark as part of our first release since it has very high hardware requirements (all intermediate results have to fit in-memory) and there currently are not very many use cases or customer hardware configurations that Spark would be an optimized fit for.
Our benchmarks show that our optimized MapReduce execution framework executes up to 5 times faster than standard MapReduce and small job execution framework executes up to even 16 times faster.