Hadoop Distributed File System: Version 2 – Part I

Image representing Hadoop as depicted in Crunc...To recap, version 1 of Hadoop is made up of two basic components; the foundation is a fault-resilient distributed file system called the Hadoop Distributed File System (HDFS), upon which a framework for the parallel processing of that distributed data, called MapReduce, is built. In this post we’ll start to look into the components, architecture and key configuration options of version 2 of the Hadoop Distributed File System (HDFS-V2). This will set us up to learn about the makeup and workings of MapReduce version 2 (MRV2) and the new resource management system, YARN (Yet Another Resource Navigator).
The Hadoop development community has rallied and worked diligently to make version 2 of HDFS a much more scalable, efficient and enterprise-friendly storage platform. This has been accomplished by focusing on and addressing the key functionality areas of:

Almost all of the significant changes between HDFS V1 and V2 have to do with the NameNode. With that in mind, this post will take a dive into the functionality of the NameNode and a look into the data on which it operates.

HDFS NameNode Availability

It is no secret that the high availability Achilles heel of HDFS is the single point of failure expressed by the NameNode. If the NameNode goes down, the entire associated Hadoop cluster is rendered unavailable. Further, a worse case is realized with the unrecoverable loss of the NameNode server. This would result in the loss of all of the metadata necessary to rebuild access to the data stored on the cluster resulting in potentially significant data loss. Because of the potential for data loss, a number of mechanisms exist to mitigate this potential.

NameNode MetaData and Operation

Before discussing the mechanisms available enhance the NameNode operation, we need to have an understanding of what it is the NameNode does, how it does it and the data on which it does it’s do.

NameNode Directory Structure

A newly initialized NameNode creates the following directory structure for persistent storage of filesystem state and metadata:

$(dfs.name.dir)
  |— current/
  |— VERSION
  |— edits
  |— fsimage
  |— fstime

Note: $(dfs.name.dir) is a configuration variable used to specify a list of directories to be used for persistent storage. It is typical to configure multiple directories to be used for the persistent storage of the NameNode so that in the case of a hardware failure, a copy of the persistent data is saved and available to be used for recovery. A common configuration is to locate a directory on two independent local disks and one remote directory via a network share (for a total of three copies of the NameNode persistent data).

VERSION File

The VERSION file is a java properties file which contains various information about the version of Hadoop and HDFS being used. Contents of a typical file might look as follows:

#Tue Feb 25 08:35:40 GMT 2014
namespaceID=142527538
cTime=0
storageType=NAME_NODE
layoutVersion=-22
namespaceID

The namespaceID is a unique identifier for the filesystem. It is created when the filesystem is initialized. The namenode uses this to identify new DataNodes. New DataNodes will not know the namespaceID of the filesystem until they have registered with the NameNode.

cTime

The creation time of the NameNode’s storage directory is held in the cTime property. Newly initialized storage always has the value, zero. It is updated to a timestamp whenever the filesystem is upgraded.

storageType

This storageType (NAME_NODE) indicates this directory contains data for a NameNode.

layoutVersion

The layoutVersion is a negative number that identifies the version of HDFS’s data structures. This version number has no relation to the release number of Hadoop. When file layouts change, the version number is decremented (for example, the version after −22 is −23) and HDFS is required to be upgraded. NameNodes and DataNodes will not operate without there being matching and correct layoutVersion numbers throughout the system.

fsimage and edits Files

The fsimage and edits files are binary files that use Hadoop Writable objects as their serialization format for persistent storage.

As the NameNode processes client requests to create, delete and move directories and files (essentially, write operations) it maintains the state and associated metadata of the directories and files in an in-memory version of the fsimage file (presumably a mnemonic for File System Image). The in-memory version of fsimage is not written to persistent storage when a change is made to it as it has the potential to be very large (hundereds of megabytes to gigabytes) and would take significant time to write.

fsimage Contents

The fsimage file contains a list of all the directory and file inodes in the filesystem. The data maintained for each inode is such information as the file’s replication level, modification and access times, access permissions, block size, and the blocks a file is made up of. For directories, data about modification time, permissions, and quota is stored.

The fsimage file does not record information about the datanodes on which the blocks are stored. Instead, the namenode keeps this mapping in memory only. This information is constructed by asking the datanodes for their block lists when they join the cluster and periodically afterward to ensure the namenode’s block mapping is up-to-date.

edits Contents

HDFS NameNode Persistent StoragePrior to making any change to the in-memory version of fsimage, a write-forward record of the changes to be made are written to the edits file, essentially as a form of transaction logging. If the NameNode has been configured to maintain multiple persistent store directories, the writes to the edits files must be flushed and synced to all configured copies before returning a notice of success. This is done to ensure that no data is lost due to a machine failure. The configuration option of specifying multiple persistent store directories for the NameNode is one of the mechanisms available to mitigate the potential of data loss due to a catastrophic NameNode failure.

fstime File

The fstime file is also a binary file making use of the Hadoop Writable object for serialization for persistent storage. The primary use of the fstime file is to record the time of the last checkpoint operation of the fsimage and edits files.

NameNode Self-Checkpoint

NameNode Self-Checkpoint Process FlowIn the most basic of configurations, a checkpoint operation of the fsimage and edits files occurs at NameNode startup as part of its ‘safe mode’ sequence of operations. The NameNode self-checkpoint process is essentially the following sequence of events:

  1. Freeze the file system for any write operations- read-only mode
  2. Load fsimage from persistent storage into memory
  3. Apply Edit Log items to fsimage
  4. Write updated, checkpointed fsimage to persistent storage (and replicate to multiple locations if so configured)
  5. Purge edits file (and replicate to multiple locations if so configured)
  6. Write timestamp of the checkpoint operation to the fstime file

Before the NameNode may exit ‘safe mode’ it must receive information about the DataNode block mapping a from a minimum number of Datanodes.

The downside to this method of operation is the that it can take a significant amount of time to roll the Edit Log forward and checkpoint the fsimage file. Depending on the amount of file system activity and the length of time since the last checkpoint, it is not unheard of for the checkpoint process to take several hours. An obvious way to reduce the amount of time for NameNode checkpoint to run is to execute a checkpoint of the fsimage file on a periodic basis. This is exactly the purpose and operation of the Secondary name node.

Secondary NameNode Checkpointing

As mentioned, the Secondary NameNode facilitates the online checkpointing of the fsimage maintained by the Primary NameNode. It does not act as a second or a backup NameNode despite its name. Periodically (the default is once per hour, or whenever the size of the edits file exceeds 64MB), the Secondary NameNode will send a message to the Primary NameNode to initiate a checkpoint. This in turn sets off the following sequence of events:

  1. Primary NameNode: Freeze writing to the current Edit Log
  2. Primary NameNode: Initialize a new Edit Log
  3. Secondary NameNode: HTTP GET Frozen Edit Log
  4. Secondary NameNode: HTTP GET fsimage
  5. Secondary NameNode: Load fsimage into memory
  6. Secondary NameNode: Apply Edit Log items to fsimage
  7. Secondary NameNode: Write fsimage to local persistent storage
  8. Secondary NameNode: HTTP PUT checkpointed fsimage file
  9. Primary NameNode: Roll Edit Log forward
  10. Primary NameNode: Roll checkpointed fsimage file forward
  11. Primary NameNode: Write checkpoint timestamp to fstime file

A diagram of this sequence of events is as follows:
HDFS Secondary NameNode Component Process Flow
In Part 2 of this article, I’ll take a look into the following:

  • High Availability for the HDFS NameNode
  • By using Quorum Journal Manager (QJM)
  • By using Network File System (NFS)
  • Namespace Federation for Performance and Scaling
  • HDFS Snapshots to support Backup and Recoverability

Creative Commons License
Hadoop Distributed File System: Version 2 – Part I by Mike Pluta is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.

Hadoop V1 Architecture Overview

hadoop-logo

As we’ve covered in previous articles, Hadoop is an open source software development project. It is a project hosted by the Apache Software Foundation. Hadoop is software focused on reliable, scalable, distributed computing.

Hadoop Architecture Description

The most simple description of the Hadoop Architecture is:

Hadoop is a parallel processing system implemented as a MapReduce engine layered on top of a fault-resilient distributed file system.

Distributed File System

As we’ve discussed, the underpinning of Hadoop (or any MapReduce system) is a distributed file system. The basic functionality of the Hadoop Distributed File System (HDFS) is explained as follows:

  • Large files are split into blocks of equal size
  • These blocks are distributed across the cluster for storage
  • Because node failure is a reality to be considered in a larger cluster, each block is stored multiple times (typically three times) on different computers

NameNode

When the cluster is started, one node is assigned to run the NameNode process. The NameNode is the centerpiece of HDFS. It maintains the directory of all files in the file system, and tracks where in the cluster file data is kept. It does not read or write the data of any of the files itself.

Client applications communicate with the NameNode when they wish to locate, add, copy, move or delete a file. A successful response from the NameNode consists of a list of relevant DataNode servers on which, the data being requested, is stored (or is to be stored).

The NameNode is a single point of failure (SPOF) for a Hadoop system. If the NameNode is not available, no data stored on the underlying HDFS may be read or written. Further, should the metadata maintained by the NameNode be lost or corrupted, it is likely that the data stored in the underlying HDFS will also be lost and / or corrupted. It is for these reasons that there exists a BackupNameNode process as part of an optional high availability option.

DataNode

When the Hadoop cluster is started, along with the NameNode process being started on one node, each node on which data is to be stored starts a DataNode process as a subordinate to the NameNode. The DataNode is responsible for reading and writing data blocks to and from the underlying HDFS as directed by the NameNode process and client applications. Client applications can, and often do, communicate directly with a DataNode. Once a client application has received from the NameNode a list of relevant DataNode servers, it is more efficient for the client application to communicate directly with the DataNode.

An Additional NameNode Note

While it is not uncommon that on smaller clusters, the server running the NameNode process is also configured to run a DataNode task, this should not be done in a production environment. Because it is a single point of failure, for a production cluster, it is essential that the server running the NameNode task be particularly looked after1.

HDFS Component Process Flow

HDFS Component Process Flow

MapReduce Engine

The MapReduce Engine is the raison d’être for the Hadoop Distributed File System. A principal tenet of MapReduce is ‘data locality’. ‘Data Locality’ is based on the assumption that is it less expensive to move processing to the data on which it is to act than it is to move data across a network to where processing resources are available. What this means is that, if at all possible, data is left in place and the processing that is to act on that data is brought to it. By having the HDFS split data which is stored on it into blocks and providing a mechanism to locate on which server in the cluster any given block is stored, the MapReduce engine is able to implement a mechanism to launch a process to act upon an arbitrary data blocks. Only if the underlying server, on which a data block needed for processing is stored, is unavailable for processing, is the movement of data considered2.

The implementation of MapReduce is an alternating application of Map and then Reduce functions against blocks of data. The complexities and difficulties of parallel execution of these functions is managed and hidden from the user automatically by the framework. A MapReduce iteration is comprised of three base phases: Map, Shuffle, and Reduce. The Shuffle phase is introduced and managed internally by the framework3.

Wikipedia explains the workflow of MapReduce as follows:

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

  1. Prepare the Map() input – the “MapReduce system” designates Map processors, assigns the K1 input key value each processor would work on, and provides that processor with all the input data associated with that key value.
  2. Run the user-provided Map() code – Map() is run exactly once for each K1 key value, generating output organized by key values K2.
  3. “Shuffle” the Map output to the Reduce processors – the MapReduce system designates Reduce processors, assigns the K2 key value each processor would work on, and provides that processor with all the Map-generated data associated with that key value.
  4. Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key value produced by the Map step.
  5. Produce the final output – the MapReduce system collects all the Reduce output, and sorts it by K2 to produce the final outcome.

Logically these 5 steps can be thought of as running in sequence – each step starts only after the previous step is completed – though in practice, of course, they can be intertwined, as long as the final result is not affected.

In many situations the input data might already be distributed (“sharded”) among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as much as possible local to the Map-generated data they need to process.4

In the Hadoop implementation of MapReduce, the coordination and management of a MapReduce job is handled by a JobTracker task and a suite of TaskTracker tasks.

MapReduce JobTracker - TaskTracker Interaction

MapReduce JobTracker – TaskTracker Interaction

JobTracker

The JobTracker is the interface between a client application and the Hadoop framework.

Once code is submitted to the Hadoop cluster, the JobTracker formulates and follows an execution plan by taking the following steps:

  • Determining where the data blocks of the input files reside
  • Assigning to nodes the different tasks to be executed as part of the MapReduce workflow and passing these instructions to a TaskTracker for execution (simplistically: Map, then shuffle, then reduce)
  • Monitoring all tasks as they are running by way of received heartbeats

If a task fails, the JobTracker will automatically relaunch the task, on a different node if necessary, up to a predefined limit of retries.

There is only one JobTracker task per Hadoop cluster. It is typically run on a server as a master node of the cluster. On smaller clusters (40 nodes or less), it is not uncommon for the JobTracker and the NameNode to coexist on the same server.

NameNode Re-Revisited

As mentioned previously, while it may not be uncommon to co-locate additional tasks on the server running the NameNode process, particularly on smaller clusters, this is a practice that is potentially fraught with peril due to the NameNode being an SPOF for the cluster. In a production environment the NameNode server should be considered fragile and cared for accordingly.

TaskTracker

When the Hadoop cluster is started, along with the DataNode processes, a TaskTracker process is stared on each node of the cluster on which data is to be stored. The relationship between the TaskTracker and DataNode should be clearly seen and understood at this point; vis-a-vis the relationship, the DataNode process is responsible for reading data from an underlying HDFS and passing that data to a process designated by the co-existing TaskTracker5.

The TaskTracker gets its execution orders from the JobTracker. When a TaskTracker is started, it is configured with a set of execution slots. These indicate the number of simultaneous tasks the TaskTracker may accept. When the JobTracker is looking for the location of a data block against which processing is to be directed, the availability of a free execution slot is taken into consideration.

The TaskTracker is also responsible for communicating job execution status (both success and failure) back to the JobTracker along with housekeeping messages such as the number of available execution slots and periodic heartbeat messages to assure the JobTracker that the TaskTracker is alive and running. It is through this heartbeat that the JobTracker is able to identify TaskTracker nodes which have failed and reschedule execution on one of the other nodes containing a copy of that data.

When a client MapReduce program is submitted, the following sequence of events takes place:

  • JobTracker is passed parameters of the client job
  • JobTracker communicates with NameNode to get a list of nodes containing both:
    • Data blocks of the input to the MapReduce job
    • Available execution slots
  • For each node returned by the NameNode:
    • JobTracker formulates an execution plan for the MapReduce job
    • JobTracker communicates with the specified TaskTracker, passing to it steps to execute
      • Prepare and Read Input
      • Map Phase
      • Sort and Shuffle Phase
      • Reduce phase
      • Write Output
Hadoop MapReduce Sequence Diagram

Hadoop MapReduce Sequence Diagram

Map Phase

As seen above, the TaskTracker gets its marching orders from the JobTracker. The first order of business the TaskTracker will handle is to communicate with the local DataNode to start reading the data block being requested and breaking the data being read into key-value pairs that will be fed is a sequential stream to the Map process6. The map function is called individually for each of these key-value pairs and in turn creates as output an arbitrarily large list of new key-value pairs from it.

Shuffle Phase

The shuffle phase begins by sorting the key-value pairs resulting from the map phase their keys. If intermediate storage is needed for these results, disk on the node local to the sort is used; intermediate data is not written to the distributed file system. After the sort, MapReduce assigns key-value pairs to a reducer according to their keys. The framework makes sure all pairs with the same key are assigned to the same reducer7. Because the output from the map phase can be distributed arbitrarily across the cluster, the output from the map phase needs to be transferred across the network to the correct producers in the shuffle phase. Because of this, it is normal for large volumes of data to cross the network in this step.

Reduce Phase

The reducer finally collates all the pairs with the same key and creates a sorted list from the values. The key and the sorted list of values provides the input for the reduce function.

The reduce function typically compresses the list of values to create a shorter list – for example, by aggregating the values. Commonly, it returns a single value as its output. Generally speaking, the reduce function creates an arbitrarily large list of key-value pairs, just like the map function.

The output from the reduce phase can, if needed, be used as the input for another map–reduce iteration.

MapReduce Data and Process Flow of Word Count

MapReduce Data and Process Flow of Word Count

This article gives a fair overview of v1 Hadoop. In the articles to follow, I’ll go over what has changed in HDFS v2, discuss the architecture of YARN (the new resource management layer in Hadoop v2) and what changes have been made to MapReduce. I’ll also dip into what, in addition to MapReduce, can be and is now plugged into the YARN framework.


Enhanced by Zemanta

Creative Commons License
Hadoop V1 Architecture Review by Mike Pluta is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.


Footnotes

  1. There are a number of best practices and considerations for configuration of the the NameNode task and server upon which it is run. These are outside of the scope of this article. 
  2. Given the default of each data block being replicated 3 times within a cluster, all 3 of those servers would have to be simultaneously occupied before data movement across the network is considered. 
  3. A default sort and shuffle class is provided and executed automatically by the framework. It can be overridden if necessary or desired. There are also a number of other default classes which are provided and automatically executed by the framework; classes to manage the input format of data, output format of data, intermediate form of data, etc. These are outside the scope of this article. 
  4. Wikipedia MapReduce Overview 
  5. Not to beat a dead horse, but I hope that at this point it is clear that a risk is being taken is the NameNode is co-mingled with other HDFS or MapReduce processes in a production environment. 
  6. There is a mechanism to apply custom processing to the input data (for binary data, etc). This is beyond the scope of this article. 
  7. This is done typically by the application of a hashing function based on the key and the number of reducers being instansiated on the cluster.