Map Reduce and YARN

This lesson describes Map Reduce and the YARN framework in detail.

Building Principles

•The individual concepts of functions called map and reduce have been derived from functional programming languages (like C++ & Java) where they were applied to lists of input data.

•Another key underlying concept is that of “divide and conquer”, where a single problem is broken into multiple individual sub tasks. This approach becomes even more powerful when the sub tasks are executed in parallel.

•The developer focuses on expressing the transformation between source and result data sets, and the Hadoop framework manages all aspects of job execution, parallelization, and coordination.

•MapReduce is a programming model designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks.

Some More Real-World Examples

•An address book relates a name (key) and to contact information (value).

•A bank account uses an account number (key) to associate with the account details (value).

•The index of a book relates a word (key) to the pages on which it occurs (value).

•On a computer file system, filenames (keys) allow access to any sort of data,such as text, images, and sound (values).

•Map Reduce sends code to distributed data, instead of bringing data to the actual code.

•Map Reduce works by breaking the processing into two phases:

•the map phase

•the reduce phase

•Each phase has key-value pairs as input and output, the types of which may be chosen by the programmer.

•The programmer also specifies two functions: •the map function •the reduce function

•There is a shuffle and sort phase in between.

Broad Steps

•Map phase takes input in Key-Value pairs

•It produces output in the form of Key-Value pair.

•Output from various Map tasks are grouped together on the basis of Key.

•Key and its associated set of values are sent to the Reduce phase.

•Reduce method operates on key and associated list of values.

•Output of Reduce is written to HDFS.

Example- Finding Out Maximum Temperature

•To visualize the way the map works, consider the following sample lines of input data

•(Dataset contains date in mm-dd-yyyy, zipcode and temperature)

•10-01-1990,123112,10 •14-02-1991,283901,11 •10-03-1990,381920,15 •10-01-1991,302918,22 •12-02-1990,384902,9

•These lines are presented to the map function as the key-value pairs:

•(0, 10-01-1990,123112,10) •(22, 14-02-1991,283901,11) •(44, 10-03-1990,381920,15) •(66, 10-01-1991,302918,22) •(88, 12-02-1990,384902,9)

•The keys are the line offsets within the file, which we ignore in our map function. •The values are the whole line itself.

•The map function merely extracts the year and the temperature, and displays them as its output

•(1990, 10)

•(1991, 11)

•(1990, 15)

•(1991, 22)

•(1990, 9)

•The output from the map function is processed by the MapReduce framework before being sent to the reduce function. •This processing sorts and groups the key-value pairs by key. •So, continuing the example, our reduce function sees the following input:

•(1990, [10, 15, 9])

•(1991, [11, 22])

•All that the reduce function has to do now is iterate through the list and pick up the maximum reading:

• (1990, 15)

• (1991, 22)

Pseudo Code

Map Phase

•Read K, V (By default, K is byte offset of the line, V is the whole line) •Extract year and temperature from V •Emit (year, temperature)

Reduce Phase

•Read K, list (Here K is the year, as produced from output of map method) •Iterate through list to get maximum element •Emit (K, max)

MapReduce Architecture & Code

The data passed to the Mapper is specified by an InputFormat

– Specified in the driver code

– Defines the loction of the input data

– Typically a file or directory

– Determines how to split the input data into input splits

– Each Mapper deals with a single input split

– Creates a RecordReader object

– RecordReader parses the input data into key/value pairs to pass to the Mapper

Hadoop runs Map tasks on the node storing the data (when possible)

– Minimizes"network"traffic"

– Many"Mappers"can"run"in"parallel"

Mapper class

•This class extends org.apache.hadoop.mapreduce.Mapper class (New API) and import org.apache.hadoop.mapred.* (Old API)

•carries out the activity of mapper

For class, Mapper<K1, V1, K2, V2>

•K1 and V1 are the types of input key and value

•K2 and V2 are the types of output key and value

Methods:

void setup(org.apache.hadoop.mapreduce.Mapper.Context context)

Context Object

Context object: allows the Mapper/Reducer to interact with the rest of the Hadoop system. It includes configuration data for the job as well as interfaces which allow it to emit output. Applications can use the Context: to report progress, to set application-level status messages, update Counters, indicate they are alive, to get the values that are stored in job configuration across map/reduce phase

•Called once at the beginning of the task.

void map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

•Called once for each key/value pair in the input split.

cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)

•Called once at the end of the task.

Reducer Class

•carries out the activity of reducer

•For class, Reducer<K1, V1, K2, V2>

•K1 and V1 are the types of input key and value

•K2 and V2 are the types of output key and value

Methods:

void setup(org.apache.hadoop.mapreduce.Reducer.Context context)

•Called once at the start of the task.

void reduce(KEYIN key, Iterable values, org.apache.hadoop.mapreduce.Reducer.Context context)

•This method is called once for each key.

cleanup(org.apache.hadoop.mapreduce.Reducer.Context context)

•Called once at the end of the task.

Driver Code

Driver code controls following parameters:

•input path, •output directory, •input format, •output format, •job name, •output key-value types etc.

Job object is used to define above parameters.

Job job= Job.getInstance(new Configuration());

•//getInstance is a static method defined in class Job.

•//Job object controls the attributes of the Map-Reduce job.

•//We can also initialize a Job object using other methods as well:

•//Job job= Job.getInstance();

•//Job job= new Job(); //this is deprecated.

•job.setJobName(“");

•job.setInputFormatClass();

•job.setOutputFormatClass();

•job.setMapperClass();

•job.setReducerClass();

•job.setOutputKeyClass();

•job.setOutputValueClass();

•job.setMapOutputKeyClass();

•job.setMapOutputValueClass();

•FileInputFormat.setInputPath(job, <path object for input file/directory>);

•FileOutputFormat.setOutputPath(job, );

•job.waitForCompletion(true);

Partitioner

Controls the partitioning of the keys of intermediate map-outputs. The key is used to derive the partition, typically by hashfunction Total number of partitions is same as number of reduce tasks for the job Controls which intermediate key and hence record is sent to which reduce task for reduction Custom partitioner:
Custom partitioner for a Hadoop job can be written easily by following the below steps: -Create a new class that extends Partitioner Class -Override method – getPartition, in the wrapper that runs in the MapReduce. -Add the custom partitioner to the job by using method set Partitioner or add the custom partitioner to the job as a config file

Combiner

A Combiner, also known as a semi-reducer, is an optional class that operates by accepting the inputs from the Map class and thereafter passing the output key-value pairs to the Reducer class

This method will submit the MapReduce job and will wait for its completion

Configuration

•Provides access to configuration parameters contained in core-site.xml

InputFormat

•Validate the input-specification of the job.

•Split-up the input file(s) into logical InputSplits

•InputSplits determine number of mappers.

•Provide the RecordReader implementation to be used to read input records from the logical InputSplit for processing by the Mapper. RecordReader takes care of remote read.

•It is used to store intermediate keyvalue pair emitted from mapper or to write final output in key-value pairs from reducers.

OutputFormat

•Validate the output-specification of the job. e.g. check that the output directory doesn’t already exist.

•Provide the RecordWriter implementation to be used to write out the output files of the job.

Writable and WritableComparable

•Serialization interfaces used by Hadoop.

•WritableComparable can be compared with each other. WritableComparable interface extends both Writable and Comparable.

InputSplit

•Data is not read directly from blocks.

•This is because records may cross boundaries of blocks.

•In the below picture, record number 8, 16, 23 and 31 are crossing block boundaries.

•To solve this problem, Hadoop uses a logical representation of the data stored in file blocks, known as input splits.

•When a MapReduce job client calculates the input splits, it figures out where the first whole record in a block begins and where the last record in the block ends.

•In cases where the last record in a block is incomplete, the input split includes location information for the next block and the byte offset of the data needed to complete the record.

InputSplit and Data Blocks: The Difference

•Split is a logical division of the input data while block is a physical division of data.

•HDFS default block size is default split size if input split is not specified.

•Split is user defined and user can control split size in his Map/Reduce program.

•One split can be mapped to multiple blocks and there can be a multiple split of one block.

•The number of map tasks (Mapper) are equal to the number of splits.

•By default, number of mappers = number of blocks = number of input splits.

Why Is The Block Size 128 MB?

•By default, number of mappers = number of blocks = number of input splits.

•Having many splits means that the time taken to process each split is small compared to the time to process the whole input.

•So, if we are processing the splits in parallel, the processing is better load balanced when the splits are small.

•On the other hand, if splits are too small, the overhead of managing the splits and map task creation begins to dominate the total job execution time.

•For most jobs, a good split size tends to be the size of an HDFS block, which is 128 MB by default.

The total number of maps required to run a job depends on the size of data to be processed. i.e., total number of blocks of input files If the input file is of the size 1000 Mbytes and typically block size is 128 Mbytes, the number of mappers will be 8

The number of reduces is approximately 0.95 or 1.75*(*mapred.tasktracker.reduce.tasks.maximum) Increasing the number of reduces increases the framework overhead, but the load balancing improves The scaling factor is not kept a whole number to reserve slots for failed tasks

Serialization

•Serialization is the process of translating data structures or object state into a format that can be stored (for example, in a file or memory buffer, or transmitted across a network connection link).

Serialization Classes in Hadoop

•Hadoop uses its own set of serializable objects which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput

•Any key or value type in the Hadoop MapReduce framework uses these objects.

•Hadoop serialization objects are mutable (unlike Java String objects) and are well suited to be sent across network.

•Data needs to be transmitted between different nodes in a distributed computing environment.

•This requires serialization and deserialization of data to convert the data that is in structured format to byte stream and vice versa.

•Hadoop uses serializable objects for key-values which implements a simple, efficient, serialization protocol, based on DataInput and DataOutput

Serialization Classes in Hadoop

Java Class/ Data Type-Similar Hadoop Class

Boolean

BooleanWritable

double

DoubleWritable

float

FloatWritable

Integer / int

IntWritable

long

LongWritable

null

NullWritable

String Text

YARN

•In Hadoop 1.x, the JobTracker has three major functions:

1.Resource Management

2.Job Scheduling

3.Job Monitoring

•YARN separates these functions into separate daemons.

MRv2 daemons

– ResourceManager – one per cluster – Starts Application Masters, allocates resources on slave nodes – Application Master – one per job – Requests resources, manages individual Map and Reduce tasks – NodeManager – one per slave node – Manages resources on individual slave nodes – JobHistory – one per cluster – Archives jobs’ metrics and metadata

Components of YARN:

1.Global Resource Manager

•Assigns resources among applications for optimal resource utilization.

•One cluster has one instance of Resource Manager.

2.Node Manager •Runs on each node and communicates with Resource Manager about resource usage on the machine.

•It receives requests from resource manager about resource allocation to jobs and maintains life cycle of containers.

3.Application-specific Application Master

•It is the actual instance which does processing.

•It requests ResouceManager for resources and works with NodeManager to get those resources for task execution. Application Master could be MapReduce or any other processing framework.

Scheduler

It is plugged with Resource Manager to help in resource allocation. Different schedulers allocate resources using different algorithms.

Container

It is a set of allocated system resources (CPU Core and Memory). Containers are allocated and managed by NodeManager and are used by tasks.

•Resource is handled by Resource Manager and Node Manager. •Processing is handled by Application Master (MapReduce is one of the many possible types of Application Master). So, processing other than MapReduce is also possible.

Anatomy of YARN Request

•Step 1: Job/Application (which can be MapReduce, Java/Scala Application, DAG jobs like Apache Spark etc.) is submitted by the YARN client application to the ResourceManager daemon along with the command to start the ApplicationMaster on any container at NodeManager

•Step 2: ApplicationManager process on Master Node validates the job submission request and hands it over to Scheduler process for resource allocation

•Step 3: Scheduler process assigns a container for ApplicationMaster on one slave node

•Step 4: NodeManager daemon starts the ApplicationMaster service within one of its container using the command mentioned in Step 1, hence ApplicationMaster is considered to be the first container of any application

•Step 5: ApplicationMaster negotiates the other containers from ResourceManager by providing the details like location of data on slave nodes, required cpu, memory, cores etc.

•Step 6: ReourceManager allocates the best suitable resources on slave nodes and responds to ApplicationMaster with node details and other details

•Step 7: Then, ApplicationMaster send requests to NodeManagers on suggested slave nodes to start the containers

•Step 8: ApplicationMaster then manages the resources of requested containers while job execution and notifies the ResourceManager when execution is completed

•Step 9: NodeManagers periodically notify the ResourceManager with the current status of available resources on the node as to what information can be used by scheduler to schedule new application on the clusters

•Step 10: Incase of any failure of slave node, ResourceManager will try to allocate new container on other best suitable node so that ApplicationMaster can complete the process using new container