As we’ve covered in previous articles, Hadoop is an open source software development project. It is a project hosted by the Apache Software Foundation. Hadoop is software focused on reliable, scalable, distributed computing.
Hadoop Architecture Description
The most simple description of the Hadoop Architecture is:
Hadoop is a parallel processing system implemented as a MapReduce engine layered on top of a fault-resilient distributed file system.
Distributed File System
As we’ve discussed, the underpinning of Hadoop (or any MapReduce system) is a distributed file system. The basic functionality of the Hadoop Distributed File System (HDFS) is explained as follows:
- Large files are split into blocks of equal size
- These blocks are distributed across the cluster for storage
- Because node failure is a reality to be considered in a larger cluster, each block is stored multiple times (typically three times) on different computers
When the cluster is started, one node is assigned to run the NameNode process. The NameNode is the centerpiece of HDFS. It maintains the directory of all files in the file system, and tracks where in the cluster file data is kept. It does not read or write the data of any of the files itself.
Client applications communicate with the NameNode when they wish to locate, add, copy, move or delete a file. A successful response from the NameNode consists of a list of relevant DataNode servers on which, the data being requested, is stored (or is to be stored).
The NameNode is a single point of failure (SPOF) for a Hadoop system. If the NameNode is not available, no data stored on the underlying HDFS may be read or written. Further, should the metadata maintained by the NameNode be lost or corrupted, it is likely that the data stored in the underlying HDFS will also be lost and / or corrupted. It is for these reasons that there exists a BackupNameNode process as part of an optional high availability option.
When the Hadoop cluster is started, along with the NameNode process being started on one node, each node on which data is to be stored starts a DataNode process as a subordinate to the NameNode. The DataNode is responsible for reading and writing data blocks to and from the underlying HDFS as directed by the NameNode process and client applications. Client applications can, and often do, communicate directly with a DataNode. Once a client application has received from the NameNode a list of relevant DataNode servers, it is more efficient for the client application to communicate directly with the DataNode.
An Additional NameNode Note
While it is not uncommon that on smaller clusters, the server running the NameNode process is also configured to run a DataNode task, this should not be done in a production environment. Because it is a single point of failure, for a production cluster, it is essential that the server running the NameNode task be particularly looked after1.
The MapReduce Engine is the raison d’être for the Hadoop Distributed File System. A principal tenet of MapReduce is ‘data locality’. ‘Data Locality’ is based on the assumption that is it less expensive to move processing to the data on which it is to act than it is to move data across a network to where processing resources are available. What this means is that, if at all possible, data is left in place and the processing that is to act on that data is brought to it. By having the HDFS split data which is stored on it into blocks and providing a mechanism to locate on which server in the cluster any given block is stored, the MapReduce engine is able to implement a mechanism to launch a process to act upon an arbitrary data blocks. Only if the underlying server, on which a data block needed for processing is stored, is unavailable for processing, is the movement of data considered2.
The implementation of MapReduce is an alternating application of Map and then Reduce functions against blocks of data. The complexities and difficulties of parallel execution of these functions is managed and hidden from the user automatically by the framework. A MapReduce iteration is comprised of three base phases: Map, Shuffle, and Reduce. The Shuffle phase is introduced and managed internally by the framework3.
Wikipedia explains the workflow of MapReduce as follows:
Another way to look at MapReduce is as a 5-step parallel and distributed computation:
- Prepare the Map() input – the “MapReduce system” designates Map processors, assigns the K1 input key value each processor would work on, and provides that processor with all the input data associated with that key value.
- Run the user-provided Map() code – Map() is run exactly once for each K1 key value, generating output organized by key values K2.
- “Shuffle” the Map output to the Reduce processors – the MapReduce system designates Reduce processors, assigns the K2 key value each processor would work on, and provides that processor with all the Map-generated data associated with that key value.
- Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key value produced by the Map step.
- Produce the final output – the MapReduce system collects all the Reduce output, and sorts it by K2 to produce the final outcome.
Logically these 5 steps can be thought of as running in sequence – each step starts only after the previous step is completed – though in practice, of course, they can be intertwined, as long as the final result is not affected.
In many situations the input data might already be distributed (“sharded”) among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as much as possible local to the Map-generated data they need to process.4
In the Hadoop implementation of MapReduce, the coordination and management of a MapReduce job is handled by a JobTracker task and a suite of TaskTracker tasks.
The JobTracker is the interface between a client application and the Hadoop framework.
Once code is submitted to the Hadoop cluster, the JobTracker formulates and follows an execution plan by taking the following steps:
- Determining where the data blocks of the input files reside
- Assigning to nodes the different tasks to be executed as part of the MapReduce workflow and passing these instructions to a TaskTracker for execution (simplistically: Map, then shuffle, then reduce)
- Monitoring all tasks as they are running by way of received heartbeats
If a task fails, the JobTracker will automatically relaunch the task, on a different node if necessary, up to a predefined limit of retries.
There is only one JobTracker task per Hadoop cluster. It is typically run on a server as a master node of the cluster. On smaller clusters (40 nodes or less), it is not uncommon for the JobTracker and the NameNode to coexist on the same server.
As mentioned previously, while it may not be uncommon to co-locate additional tasks on the server running the NameNode process, particularly on smaller clusters, this is a practice that is potentially fraught with peril due to the NameNode being an SPOF for the cluster. In a production environment the NameNode server should be considered fragile and cared for accordingly.
When the Hadoop cluster is started, along with the DataNode processes, a TaskTracker process is stared on each node of the cluster on which data is to be stored. The relationship between the TaskTracker and DataNode should be clearly seen and understood at this point; vis-a-vis the relationship, the DataNode process is responsible for reading data from an underlying HDFS and passing that data to a process designated by the co-existing TaskTracker5.
The TaskTracker gets its execution orders from the JobTracker. When a TaskTracker is started, it is configured with a set of execution slots. These indicate the number of simultaneous tasks the TaskTracker may accept. When the JobTracker is looking for the location of a data block against which processing is to be directed, the availability of a free execution slot is taken into consideration.
The TaskTracker is also responsible for communicating job execution status (both success and failure) back to the JobTracker along with housekeeping messages such as the number of available execution slots and periodic heartbeat messages to assure the JobTracker that the TaskTracker is alive and running. It is through this heartbeat that the JobTracker is able to identify TaskTracker nodes which have failed and reschedule execution on one of the other nodes containing a copy of that data.
When a client MapReduce program is submitted, the following sequence of events takes place:
- JobTracker is passed parameters of the client job
- JobTracker communicates with NameNode to get a list of nodes containing both:
- Data blocks of the input to the MapReduce job
- Available execution slots
- For each node returned by the NameNode:
- JobTracker formulates an execution plan for the MapReduce job
- JobTracker communicates with the specified TaskTracker, passing to it steps to execute
- Prepare and Read Input
- Map Phase
- Sort and Shuffle Phase
- Reduce phase
- Write Output
As seen above, the TaskTracker gets its marching orders from the JobTracker. The first order of business the TaskTracker will handle is to communicate with the local DataNode to start reading the data block being requested and breaking the data being read into key-value pairs that will be fed is a sequential stream to the Map process6. The map function is called individually for each of these key-value pairs and in turn creates as output an arbitrarily large list of new key-value pairs from it.
The shuffle phase begins by sorting the key-value pairs resulting from the map phase their keys. If intermediate storage is needed for these results, disk on the node local to the sort is used; intermediate data is not written to the distributed file system. After the sort, MapReduce assigns key-value pairs to a reducer according to their keys. The framework makes sure all pairs with the same key are assigned to the same reducer7. Because the output from the map phase can be distributed arbitrarily across the cluster, the output from the map phase needs to be transferred across the network to the correct producers in the shuffle phase. Because of this, it is normal for large volumes of data to cross the network in this step.
The reducer finally collates all the pairs with the same key and creates a sorted list from the values. The key and the sorted list of values provides the input for the reduce function.
The reduce function typically compresses the list of values to create a shorter list – for example, by aggregating the values. Commonly, it returns a single value as its output. Generally speaking, the reduce function creates an arbitrarily large list of key-value pairs, just like the map function.
The output from the reduce phase can, if needed, be used as the input for another map–reduce iteration.
This article gives a fair overview of v1 Hadoop. In the articles to follow, I’ll go over what has changed in HDFS v2, discuss the architecture of YARN (the new resource management layer in Hadoop v2) and what changes have been made to MapReduce. I’ll also dip into what, in addition to MapReduce, can be and is now plugged into the YARN framework.
Hadoop V1 Architecture Review by Mike Pluta is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.
- There are a number of best practices and considerations for configuration of the the NameNode task and server upon which it is run. These are outside of the scope of this article. ↩
- Given the default of each data block being replicated 3 times within a cluster, all 3 of those servers would have to be simultaneously occupied before data movement across the network is considered. ↩
- A default sort and shuffle class is provided and executed automatically by the framework. It can be overridden if necessary or desired. There are also a number of other default classes which are provided and automatically executed by the framework; classes to manage the input format of data, output format of data, intermediate form of data, etc. These are outside the scope of this article. ↩
- Wikipedia MapReduce Overview ↩
- Not to beat a dead horse, but I hope that at this point it is clear that a risk is being taken is the NameNode is co-mingled with other HDFS or MapReduce processes in a production environment. ↩
- There is a mechanism to apply custom processing to the input data (for binary data, etc). This is beyond the scope of this article. ↩
- This is done typically by the application of a hashing function based on the key and the number of reducers being instansiated on the cluster. ↩