What is Hadoop?

What is Hadoop?

• Open source distributed data processing cluster

• Data processed in Hadoop Distributed File System (HDFS)

• Hadoop is that new framework which helps to solve the problem of Data explosion.

• Hadoop is an open source, Java-based programming framework that supports the processing of large data sets in a distributed computing environment

• Hadoop provides: A reliable, scalable platform for storage and analysis

• It is based on Google File System or GFS

• Hadoop runs a number of applications on distributed systems with thousands of nodes involving petabytes of data

• It has a distributed file system, called the Hadoop Distributed File System or HDFS, which enables fast data transfer among the nodes

• It leverages a distributed computation framework called MapReduce

• Resource Management is performed by YARN

Problems with distributed processing:

• Hardware failure: can be solved by redundancy • Coordinating the tasks and combining results from all machines

Hadoop takes care of the above complexities and the challenges of network/distributed programming by the following two frameworks-

• HDFS (Storage (of data and results))

• Map Reduce (for Processing (Analysis of data))

Hadoop ecosystem

Hadoop HDFS– Distributed storage layer for Hadoop. Yarn Hadoop – Resource management layer introduced in Hadoop 2.x.

Hadoop Map-Reduce – Parallel processing layer for Hadoop.

HBase – It is a column-oriented database that runs on top of HDFS. It is a NoSQL database which does not understand the structured query. For sparse data set, it suits well.

Hive – Apache Hive is a data warehousing infrastructure based on Hadoop and it enables easy data summarization, using SQL queries.

Pig – It is a top-level scripting language. As we use it with Hadoop. Pig enables writing complex data processing without Java programming.

Flume – It is a reliable system for efficiently collecting large amounts of log data from many different sources in real-time.

Sqoop – It is a tool design to transport huge volumes of data between Hadoop and RDBMS.

Oozie – It is a Java Web application uses to schedule Apache Hadoop jobs. It combines multiple jobs sequentially into one logical unit of work.

Zookeeper – A centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Setting up Hadoop Clusters and the Hadoop Ecosystem

On Premise Cluster installation is usually performed by a system administrator – Out of the scope of this course

Developers should understand how the components of a Hadoop cluster work together

Developers typically use Hadoop in pseudo’distributed,mode – A single/machine “cluster” – All Hadoop daemons run on the same machine – Useful for tesQng

Steps for Installing A Hadoop Cluster

Difficult – Download, install, and integrate individual Hadoop components directly from Apache

Easier: Ambari/Clouder Manager, etc – Distribution including Apache Hadoop – Includes many other components from the Hadoop ecosystem

Easiest: Spin Clusters in cloud using AWS/ EC2 – Wizard/based UI to install, configure and manage a Hadoop cluster

HDFS - Hadoop Distributed File System

• The file store in HDFS provides scalable, fault tolerant storage at low cost.

• The HDFS software detects and compensates for hardware issues, including disk problems and server failure.

• HDFS stores file across the collection of servers in a cluster.

• Files are decomposed into the blocks and each block is written to more than one of the servers.

• The replication provides both fault tolerance and performance.

• HDFS is a filesystem wriXen in Java

• Sits on top of a native filesystem

• Such"as"ext3,“ext4"or"xfs”

• Provides redundant storage for massive amounts of data

• Using"readily/available,"industry/standard"computer

Design of HDFS

HDFS has been designed keeping in view the following features:

•Very large files: Files that are megabytes, gigabytes, terabytes or petabytes of size.

•Data access: HDFS is built around the idea that data is written once but read many times. A dataset is copied from source and then analysis is done on that dataset over time.

•Commodity hardware: Hadoop does not require expensive, highly reliable hardware as it is designed to run on clusters of commodity hardware.

•Growth of storage vs read/write performance- One hard drive in 1990 could store 1,370 MB of data and had a transfer speed of 4.4 MB/s so full data can be read in five minutes.Now with 1 terabyte drive transfer speed is around 100 MB/s But it takes more than two and a half hours to read all the data off the disk.

•Although the storage capacities of hard drives have increased, yet access speeds have not kept up with the same cost spending.

HDFS Blocks

• If we had 100 drives, each holding one hundredth of the data working in parallel, we could read the data in under two minutes

• Hard Disk has concentric circles which form tracks.

• One file can contain many blocks. These blocks in local file system are nearly 512 bytes and not necessarily continuous.

• For HDFS, since it is designed for large files, block size is 128 MB by default. Moreover, it gets blocks of local file system contiguously to minimise head seek time.

Components of Hadoop 1.x

NameNode

•Contains Hadoop FileSystem

•Tree and other metadata information about files and directories.

•Contains in memory mapping of which blocks are stored in which datanode

Secondary Namenode

•Performs house-keeping activities for namenode, like periodic merging of namespace and edits.

•This is not a back up for namenode

DataNode

•Stores actual data blocks of file in HDFS on its own local disk.

•Sends signals to NameNode periodically (called as Heartbeat) to verify it is active.

•Sends block reporting to the nameode on cluster startup as well as periodically at every 10th Heartbeat.

•The data node are the workhorse of the system.

•They perform all the block operation including periodic checksum. They receive instructions from the name node of where to put the blocks and how to put the blocks.

Daemons of Hadoop 1.x

•JobTracker(Not present in Hadoop 2.x) - Controls overall execution of map reduce jobs.

•TaskTracker(Not present in Hadoop 2.x)-Runs individual map-reduce jobs on datanodes

•Periodically communicates with jobtracker to give updates and receive instructions

NameNode contains two important files on its hard disk:

  1. fsimage(file system image)

It contains:

•all directory structure of HDFS

•replication level of file

•modification and access times of files

•access permissions of files and directories

•block size of files

•the blocks constituting a file

•A Transaction Log-Records file creations, file deletions etc.

  1. Edits

•When any write operation takes place in HDFS, the directory structure gets modified.

•These modifications are stored in memory as well as in edits files (edits files are stored on hard disk).

•If existing fsimage file gets merged with edits, we’ll get updated fsimage file.

•This process is called checkpointing and is carried out by Secondary Namenode

Safe Mode:

•During start up, the NameNode loads the file system state from the fsimage and the edits log file.

•It then waits for DataNodes to report their blocks. During this time, NameNode stays in Safemode

•Safemode for the NameNode is essentially a read-only mode for the HDFS cluster, where it does not allow any modifications to file system or blocks

Replica Placement

•How does the namenode choose which datanodesto store replicas on?

•Placing all replicas on a single node incurs the lowest write bandwidth penalty (since the replication pipeline runs on a single node)

•But this offers no real redundancy (if the node fails, the data for that block is lost).

•Also, the read bandwidth is high for off-rack reads.

•At the other extreme, placing replicas in different data centres may maximize redundancy, but at the cost of write bandwidth.

•Hadoop’s default strategy is to place the first replica on the same node as the client

•For clients running outside the cluster, a node is chosen at random.

•The system tries not to pick nodes that are too full or too busy.

•The second replica is placed on a different rack from the first (off-rack), chosen at random.

•The third replica is placed on the same rack as the second, but on a different node chosen at random.

•Further replicas are placed on random nodes in the cluster, although the system tries to avoid placing too many replicas on the same rack.

Benefits of Replica Placement and Rack Awareness

This strategy gives a good balance among:

• reliability (blocks are stored on two racks, so data is available even in case of node or rack failure) • write bandwidth (writes only have to traverse a single network switch) • read performance (there’s a choice of two racks to read from) • block distribution across the cluster (clients only write a single block on the local rack)

Balancer:

A tool that analyzes block placement and re-balances data across the DataNodes.

Goal: disk full on DataNodes should be similar

•Usually run when new DataNodes are added

•Cluster is online when rebalancer is active

•Rebalancer is throttled to avoid network congestion

•Command line tool

URI, URL, and URN

URI (Uniform Resource Identifier)

•Uniquely identifies a resource along with its scheme.

URL (Uniform Resource Locator)

•Defines network location and scheme about how a resource will be obtained.

URN (Uniform Resource Name)

•Identifies a resource by name in a given namespace.

HDFS Commands

Read Commands Demo

•cat •checksum •ls •text

Write Commands Demo

•appendToFile •copyFromLocal •put •moveFromLocal •copyToLocal •get •cp •mkdir •mv •rm

Data Read Steps

  1. Client opens the file it wishes to read by calling open() on DistributedFileSystem object (which is an implementation of FileSystem class).

  2. DistributedFileSystem calls the namenode, to determine the locations of the first few blocks in the file.

  3. For each block, the namenode returns the addresses of the datanodes that have a copy of that block. Datanodes are sorted according to their proximity to the client. Proximity decreases as: different nodes on same rack > different racks > different data centres. If the client is itself a datanode, the client will read from the local datanode.

  4. The DistributedFileSystem returns an FSDataInputStream to the client for it to read data.FSDataInputStream uses DFSInputStream to manage the datanode and namenode I/O.

  5. The client then calls read() on the stream FSDataInputStream

  6. DFSInputStream connects to the closest datanode for the first block in the file.

  7. When entire block is read, the best datanode for the next block is found and read.

  8. DFSInputStream calls the namenode to retrieve the datanode locations for the next batch of blocks.

  9. When the client has finished reading, it calls close() on the FSDataInputStream.

Important Java Classes to Write Data to HDFS

FileSystem

•an abstract file system class. •provides methods to work with the file system like reading and writing files.

DistributedFileSystem

•an implementation of FileSystem for distributed file system.

FSDataOutputStream

•provides stream (channel) for writing data.

DFSOutputStream

•handles communication of the namenode with various datanodes.

•called internally by FSDataOutputStream.

Writing File to HDFS: Steps

  1. Client creates calls create() on DistributedFileSystem

  2. DistributedFileSystem contacts namenode to create a new file in the filesystem’s namespace, with no blocks associated with it. The namenode performs various checks to make sure the file doesn’t already exist and that the client has the right permissions to create the file. If these checks pass, the namenode makes a record of the new file (in edits)

  3. DistributedFileSystem returns an FSDataOutputStream for the client to start writing data. FSDataOutputStream uses DFSOutputStream, which handles communication with the datanodes and namenode

  4. Client signals write() method on FSDataOutputStream

  5. DFSOutputStream splits data into packets and writes it to an internal queue called the data queue. • The data queue is consumed by the DataStreamer, which asks the namenode to give location of datanodes where blocks will be stored.

• The list of datanodes forms a pipeline with number of datanodes equals replication factor.

• The DataStreamer streams the packets to the first datanode in the pipeline.

• First datanode stores each packet and forwards it to the second datanode in the pipeline.

• Similarly, the second datanode stores the packet and forwards it to the third datanode in the pipeline. • The DFSOutputStream also maintains an internal queue called the ack queue.

• A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline.

•So there are two queues: data queue (containing packets that are to be written) and ack queue (containing packets that are yet to be acknowledged)

  1. When the client has finished writing data, it calls close() on the stream. This flushes all the remaining packets to the datanode pipeline and waits for acknowledgments.

  2. DistributedFileSystem contacts the namenode to signal that the file write activity is complete.

•The namenode already knows which blocks the file is made up of (because DataStreamer had asked for block allocations), so it only has to wait for blocks to be minimally replicated before returning successfully.

•Closing a file in HDFS performs an implicit hflush().

•After a successful return from hflush(), HDFS guarantees that the data written up to that point in the file has reached all the datanodes in the write pipeline and is visible to all new readers.

•They perform all the block operation including periodic checksum. They receive instructions from the name node of where to put the blocks and how to put the blocks.