Hive

Hive is a SQL like query interface on Hadoop, built to process less simple to complex structure data , this lesson has details on some practical uses of Hive.

Introduction

•Hive is a data warehouse solution built on top of Hadoop by Facebook.

•An essential tool in the Hadoop ecosystem that provides an SQL (Structured Query Language) dialect (called as Hive Query Language) for querying data stored in the Hadoop Distributed Filesystem (HDFS).

•Most data warehouse applications are implemented using relational databases that use SQL as the query language. Hive lowers the barrier for moving these applications to Hadoop. People who know SQL can learn Hive easily.

•Automatically uses HDFS for storage, but stores all the meta information about database and table in metadata DB locally to Hive.

•Hive is most suited for data warehouse applications, where relatively static data is analyzed , fast response times are not required, and when the data is not changing rapidly.

Hive Architecture

Hive DDL

DATABASES

•Tables in that database will be stored in subdirectories of the database directory. •The exception is tables in the default database, which doesn’t have its own directory.

•The database directory is created under a top-level directory specified by the property hive.metastore.warehouse.dir

set hive.metastore.warehouse.dir;

•You can override this default location for the new directory as shown:

•CREATE DATABASE test;

•LOCATION ‘/user/test/mydb’;

•DESCRIBE DATABASE test_db;

•The USE command sets a database as your working database, analogous to changing working directories in a filesystem.

USE test_db; set hive.cli.print.current.db=true; DROP DATABASE IF EXISTS test_db;

•The IF EXISTS is optional and suppresses warnings if test_db doesn’t exist.

•By default, Hive won’t permit you to drop a database if it contains tables.

•You can either drop the tables first or append the CASCADE keyword to the command, which will cause the Hive to drop the tables in the database first:

DROP DATABASE IF EXISTS test_db CASCADE;

•When a database is dropped, its directory is also deleted.

TABLES

The CREATE TABLE statement follows SQL conventions, but Hive’s version offers significant extensions to support a wide range of flexibility where the data files for tables are stored, the formats used, etc.

create table if not exists emp_details(emp_name string,unit string,exp int,location string) row format delimited fields terminated by ‘,’;

•DROP TABLE IF EXISTS emp_details;

•Most table properties can be altered with ALTER TABLE statements, which change metadata about the table but not the data itself.

Renaming a Table

•Use this statement to rename the table emp_details to employee_details :

ALTER TABLE emp_details RENAME TO employee_details;

Changing Columns

•You can rename a column, change its position, type, or comment:

ALTER TABLE emp_details

CHANGE COLUMN emp_name emp_name STRING COMMENT ‘Employee Name’

AFTER unit;

•You have to specify the old name, a new name, and the type, even if the name or type is not changing.

•A common use case for views is restricting the result rows based on the value of one or more columns.

•When a query becomes long or complicated, a view may be used to hide the complexity by dividing the query into smaller, more manageable pieces; similar to writing a function in a programming language or the concept of layered design in software.

CREATE VIEW joined_view AS SELECT * FROM people JOIN cart ON ( cart.people_id =people.id) WHERE firstname =‘john’;

Hive Views

•As part of Hive’s query optimization, the clauses of both the query and view may be combined together into a single actual query.

•The conceptual view still applies when the view and a query that uses it both contain an ORDER BY clause or a LIMIT clause.

•The view’s clauses are evaluated before the using query’s clauses.

•For example, if the view has a LIMIT 100 clause and the query has a LIMIT 200 clause, you’ll get at most 100 results.

•While defining a view doesn’t “materialize” any data, the view is frozen to any subsequent changes to any tables and columns that the view uses.

•Hence, a query using a view can fail if the referenced tables or columns no longer exist.

Numeric Data Types

•Integral types are – TINYINT, SMALLINT, INT & BIGINT

•Equivalent to Java’s byte , short , int, and long primitive types

•Floating types are –FLOAT, DOUBLE & DECIMAL.

•Equivalent to Java’s float and double , and SQL’s Decimal respectively.

•DECIMAL(5,2) represents total of 5 digits, out of which 2 are decimal digits.

Primary Data Types

Numeric Data types String Data Types String data types Date/Time Types

•Hive provides DATE and TIMESTAMP data types in traditional UNIX time stamp format for date/time related fields in hive.

•DATE values are represented in the form YYYY-MM-DD. Example: DATE ‘2014-12-07’. Date ranges allowed are 0000-01-01 to 9999-12-31.

•TIMESTAMP use the format yyyy-mm-dd hh:mm:ss [.f…].

•We can also cast the String, Time-stamp values to Date format if they match format.

•Hive supports two more primitive data types, BOOLEAN and BINARY. Similar to Java’s Boolean, BOOLEAN in hive stores true or false values only.

•BINARY is an array of Bytes and similar to VARBINARY in many RDBMSs

•Load operations are currently pure copy/move operations that move datafiles into locations corresponding to Hive tables.

•LOAD DATA [LOCAL] INPATH ‘filepath’ [OVERWRITE] INTO TABLE tablename[PARTITION (partcol1=val1, partcol2=val2 …)]

•filepath can refer to a file (in which case Hive will move the file into the table) or it can be a directory (in which case Hive will move all the files within that directory into the table).

•If the keyword LOCAL is specified, then:

•the load command will look for filepath in the local file system.

•If a relative path is specified, it will be interpreted relative to the user’s current working directory.

•The user can specify a full URI for local files as well, for example: file:///user/hive/project/data1

•the load command will try to copy all the files addressed by filepath to the target filesystem.

Data Load

•If the OVERWRITE keyword is used then the contents of the target table (or partition) will be deleted and replaced by the files referred to by filepath, otherwise the files referred by filepath will be added to the table.

•Query results can be inserted into filesystem directories.

•INSERT OVERWRITE [LOCAL] DIRECTORY directory1 [ROW FORMAT row_format ] [STORED AS file_format ] (Note: Only available starting with Hive 0.11.0) SELECT … FROM …

Table Types

•When you drop an external table, it only drops the meta data.

•On dropping the external table, the data does not get deleted from HDFS.

•Thus it is evident that the external table is just a pointer on HDFS data.

Use EXTERNAL tables when:

•The data is also used outside of Hive. For example, the data files are read and processed by an existing program that doesn’t lock the files.

•Data needs to remain in the underlying location even after a DROP TABLE. This can apply if you are pointing multiple schemas (tables or views) at a single data set or if you are iterating through various possible schemas.

•Hive should not own data and control settings, dirs , etc., you have another program or process that will do those things.

•You are not creating table based on existing table (AS SELECT).

Use INTERNAL tables when:

•The data is temporary.

•You want Hive to completely manage the lifecycle of the table and data.

Data Types

•Complex Types can be built up from primitive types and other composite types.

•Data type of the fields in the collection are specified using an angled bracket notation.

•Currently Hive supports four complex data types. They are:

ARRAY

•ARRAY<data_type>

•An Ordered sequences of similar type elements that are indexable using

•zero-based integers.

•It is similar to arrays in Java.

•Example – array (‘siva’, ‘bala’, ‘praveen’);

•Second element is accessed with array[1].

Complex Data Types

MAP

•MAP< primitive_type , data_type

•Collection of key-value pairs.

•Fields are accessed using array notation of keys (e.g., [‘key’]).

STRUCT

•STRUCT<col_name: data_type[COMMENT col_comment], …>

•It is similar to STRUCT in C language.

•It is a record type which encapsulates a set of named fields that can be any primitive data type.

•Elements in STRUCT type are accessed using the DOT (.) notation.

Example –For a column c of type STRUCT {a INT; b INT} the a field is accessed by the expression c.a

•UNIONTYPE<data_type, data_type, …>

•It is similar to Unions in C.

•At any point of time, an Union Type can hold any one (exactly one) data type from its specified data types.

UDF

How do I use a Python UDF in Hive?

Hive exchanges data with Python using a streaming technique – Rows from Hive are passed to Python through STDIN – Processed rows from Python are passed to Hive through STDOUT

line = sys.stdin.readline()

print processed_row

Use the Hive TRANSFORM statement to invoke a UDF add file wasb:///scripts/myscript.py;

SELECT TRANSFORM (col1, col2, col3) USING ‘python myscript.py’ AS(col1 string, col2 int , col3 string) FROM mytable ORDER BY col1;

Hive Operations

DISTRIBUTE BY controls how map output is divided among reducers.

•All data that flows through a MapReduce job is organized into key-value pairs.

•There is one other scenario where these clauses are useful.

•By default, MapReduce computes a hash on the keys output by mappers and tries to evenly distribute the key-value pairs among the available reducers using the hash values.

•Unfortunately, this means that when we use SORT BY, the contents of one reducer’s output will overlap significantly with the output of the other reducers, as far as sorted order is concerned, even though the data is sorted within each reducer’s output.

•Suppose, we are interested in sorting all the contents of users table on name for every unit.

•We have to make sure that records with same unit goes to a single reducer, then we can DISTRIBUTE BY unit.

Distribute By and Cluster By

•DISTRIBUTE BY works similar to GROUP BY in the sense that it controls how reducers receive rows for processing, while SORT BY controls the sorting of data inside the reducer.

•Hive requires that the DISTRIBUTE BY clause come before the SORT BY clause.

•Using DISTRIBUTE BY … SORT BY is equivalent to CLUSTER BY.

•If we need to DISTRIBUTE and SORT (ascending) on the same column, we may choose to use CLUSTER BY.

Partitioning

•Hive organizes tables horizontally into partitions.

•It is a way of dividing a table into related parts based on the values of partitioned columns such as date, city, department etc.

•Using partition, it is easy to query a portion of the data.

•Partitioning can be done based on more than column which will impose multi-dimensional structure on directory storage.

•In Hive, partitioning is supported for both managed and external tables.

•The partition statement lets Hive alter the way it manages the underlying structures of the table’s data directory.

•In case of partitioned tables, subdirectories are created under the table’s data directory for each unique value of a partition column.

•When a partitioned table is queried with one or both partition columns in criteria or in the WHERE clause, what Hive effectively does is partition elimination by scanning only those data directories that are needed.

•If no partitioned columns are used, then all the directories are scanned (full table scan) and partitioning will not have any effect.

Classification of partitioning

•Static partitioning

•Dynamic Partitioning

When to use static partitioning

•Static partitioning needs to be applied when we know data (supposed to be inserted) belongs to which partition.

When to use dynamic partitioning

•In static partitioning, every partitioning needs to be backed with individual hive statement which is not feasible for large number of partitions as it will require writing of lot of hive statements. •In that scenario dynamic partitioning is suggested as we can create as many number of partitions with single hive statement.

Bucketing

•Bucketing concept is based on (hashing function on the bucketed column) mod (by total number of buckets). The hash_function depends on the type of the bucketing column.

•Records with the same bucketed column will always be stored in the same bucket.

•We use CLUSTERED BY clause to divide the table into buckets.

•Physically, each bucket is just a file in the table directory, and Bucket numbering is 1-based.

•Bucketing can be done along with Partitioning on Hive tables and even without partitioning.

•Bucketed tables will create almost equally distributed data file parts, unless there is skew in data.

•Bucketing is enabled by setting hive.enforce.bucketing= true;

Advantages

•Bucketed tables offer efficient sampling than by non-bucketed tables. With sampling, we can try out queries on a fraction of data for testing and debugging purpose when the original data sets are very huge.

•As the data files are equal sized parts, map-side joins will be faster on bucketed tables than non-bucketed tables.

•Bucketing concept also provides the flexibility to keep the records in each bucket to be sorted by one or more columns. This makes map-side joins even more efficient, since the join of each bucket becomes an efficient merge-sort.

Bucketing Vs Partitioning

•Partitioning helps in elimination of data, if used in WHERE clause, where as bucketing helps in organizing data in each partition into multiple files, so that the same set of data is always written in same bucket.

•Bucketing helps a lot in joining of columns.

•Hive Bucket is nothing but another technique of decomposing data or decreasing the data into more manageable parts or equal parts.

Sampling

•TABLESAMPLE() gives more disordered and random records from a table as compared to LIMIT. •We can sample using the rand() function, which returns a random number.

SELECT * from users TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand()) s;

SELECT * from users TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand()) s;

•Here rand() refers to any random column. •The denominator in the bucket clause represents the number of buckets into which data will be hashed. •The numerator is the bucket number selected.

SELECT * from users TABLESAMPLE(BUCKET 2 OUT OF 4 ON name) s;

•If the columns specified in the TABLESAMPLE clause match the columns in the CLUSTERED BY clause, TABLESAMPLE queries only scan the required hash partitions of the table.

SELECT * FROM buck_users TABLESAMPLE(BUCKET 1 OUT OF 2 ON id) s LIMIT 1;

Joins and Types

Reduce-Side Join

•If datasets are large, reduce side join takes place.

Map-Side Join

•In case one of the dataset is small, map side join takes place. •In map side join, a local job runs to create hash-table from content of HDFS file and sends it to every node.

SET hive.auto.convert.join =true;

Bucket Map Join

•The data must be bucketed on the keys used in the ON clause and the number of buckets for one table must be a multiple of the number of buckets for the other table. •When these conditions are met, Hive can join individual buckets between tables in the map phase, because it does not have to fetch the entire content of one table to match against each bucket in the other table. •set hive.optimize.bucketmapjoin =true; •SET hive.auto.convert.join =true;

SMBM Join

•Sort-Merge-Bucket (SMB) joins can be converted to SMB map joins as well.

•SMB joins are used wherever the tables are sorted and bucketed.

•The join boils down to just merging the already sorted tables, allowing this operation to be faster than an ordinary map-join.

•set hive.enforce.sortmergebucketmapjoin =false;

•set hive.auto.convert.sortmerge.join =true;

•set hive.optimize.bucketmapjoin = true;

•set hive.optimize.bucketmapjoin.sortedmerge = true;

LEFT SEMI JOIN

•A left semi-join returns records from the lefthand table if records are found in the righthand table that satisfy the ON predicates.

•It’s a special, optimized case of the more general inner join.

•Most SQL dialects support an IN … EXISTS construct to do the same thing.

•SELECT and WHERE clauses can’t reference columns from the righthand table.

•Right semi-joins are not supported in Hive.

•The reason semi-joins are more efficient than the more general inner join is as follows:

•For a given record in the lefthand table, Hive can stop looking for matching records in the righthand table as soon as any match is found.

•At that point, the selected columns from the lefthand table record can be projected

Input Formats in Hive

•A file format is a way in which information is stored or encoded in a computer file.

•In Hive it refers to how records are stored inside the file.

•InputFormat reads key-value pairs from files.

•As we are dealing with structured data, each record has to be its own structure.

•How records are encoded in a file defines a file format.

•These file formats mainly vary between data encoding, compression rate, usage of space and disk I/O.

•Hive does not verify whether the data that you are loading matches the schema for the table or not. •However, it verifies if the file format matches the table definition or not.

SerDe in Hive

•The SerDe interface allows you to instruct Hive as to how a record should be processed.

•A SerDe is a combination of a Serializer and a Deserializer (hence, Ser-De).

•The Deserializer interface takes a string or binary representation of a record, and translates it into a Java object that Hive can manipulate.

•The Serializer, however, will take a Java object that Hive has been working with, and turn it into something that Hive can write to HDFS or another supported system.

•Commonly, Deserializers are used at query time to execute SELECT statements, and Serializers are used when writing data, such as through an INSERT-SELECT statement.

CSVSerDe

•Use ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.OpenCSVSerde’

•Define following in SERDEPROPERTIES

( " separatorChar " = < value_of_separator

, " quoteChar " = < value_of_quote_character ,

" escapeChar “ = < value_of_escape_character

)

JSONSerDe

•Include hive-hcatalog-core-0.14.0.jar •Use ROW FORMAT SERDE ’ org.apache.hive.hcatalog.data.JsonSerDe ’

RegexSerDe

•It is used in case of pattern matching. •Use ROW FORMAT SERDE

'org.apache.hadoop.hive.contrib.serde2.RegexSerDe‘

•In SERDEPROPERTIES, define input pattern and output fields.

For Example

•input.regex = ‘(.)/(.)@(.*)’ •output.format.string’ = ’ 1 s 2 s 3 s’;

Analytic and Windowing in Hive

•Analytic functions (also known as window functions) are a special category of built-in functions. Like aggregate functions, they examine the contents of multiple input rows to compute each output value.

•However, rather than being limited to one result value per GROUP BY group, they operate on windows where the input rows are ordered and grouped using flexible conditions expressed through an OVER() clause.

•Some functions, such as LAG() and RANK(), can only be used in this analytic context.

•Some aggregate functions do double duty: when you call the aggregation functions such as MAX(), SUM(), AVG(), and so on with an OVER() clause, they produce an output value for each row, based on computations across other rows in the window.

•Although analytic functions often compute the same value you would see from an aggregate function in a GROUP BY query, the analytic functions produce a value for each row in the result set rather than a single value for each group.

•This flexibility lets you include additional columns in the SELECT list, offering more opportunities for organizing and filtering the result set.

•HCatalog is a table and storage management layer for Hadoop that enables users with different data processing tools — Pig, MapReduce — to more easily read and write data on the grid.

•HCatalog’s table abstraction presents users with a relational view of data in the Hadoop distributed file system (HDFS) and ensures that users need not worry about where or in what format their data is stored — RCFile format, text files, SequenceFiles , or ORC files. •HCatalog supports reading and writing files in any format for which a SerDe ( serializer-deserializer ) can be written. By default, HCatalog supports RCFile , CSV, JSON, and SequenceFile , and ORC file formats. To use a custom format, you must provide the InputFormat , OutputFormat , and SerDe

HCatalog

•HCatalog is built on top of the Hive metastore and incorporates Hive’s DDL. HCatalog provides read and write interfaces for Pig and MapReduce and uses Hive’s command line interface for issuing data definition and metadata exploration commands.

UDF in Hive

•To write a UDF, start by extending the UDF class and implements and the evaluate() function.

•During query processing, an instance of the class is instantiated for each usage of the function in a query.

•The evaluate() is called for each input row.

•The result of evaluate() is returned to Hive.

•It is legal to overload the evaluate method.

•Hive will pick the method that matches in a similar way to Java method overloading.

•Finally to use UDF, create jar and register the class as temporary function.

ADD JAR <jar_file_path>;

ADD JAR /home/beingdatum/hive/hive-udf.jar;

CREATE TEMPORARY FUNCTION < symbolic_name

as '< full_class_name ';

CREATE TEMPORARY FUNCTION up AS ’ udf.ToUpper ';

USE PARTITIONING AND BUCKETING

•Partitioning a table stores data in sub-directories categorized by table location, which allows Hive to exclude unnecessary data from queries without reading all the data every time a new query is made.

•Hive does support Dynamic Partitioning (DP) where column values are only known at EXECUTION TIME. To enable Dynamic Partitioning :

SET hive.exec.dynamic.partition =true;

•Another situation we want to protect against dynamic partition insert is that the user may accidentally specify all partitions to be dynamic partitions without specifying one static partition, while the original intention is to just overwrite the sub-partitions of one root partition.

SET hive.exec.dynamic.partition.mode =strict;

To enable bucketing:

SET hive.enforce.bucketing =true;

Optimizations in Hive

•Use Denormalisation , Filtering and Projection as early as possible to reduce data before join.

•Join is a costly affair and requires extra map-reduce phase to accomplish query job. With De-normalisation, the data is present in the same table so there is no need for any joins, hence the selects are very fast.

•As join requires data to be shuffled across nodes, use filtering and projection as early as possible to reduce data before join.

TUNE CONFIGURATIONS

•To increase number of mapper, reduce split size :

SET mapred.max.split.size =1000000; (~1 MB)

•Compress map/reduce output

SET mapred.compress.map.output =true;

SET mapred.output.compress =true;

•Parallel execution

•Applies to MapReduce jobs that can run in parallel, for example jobs processing different source tables before a join.

SET hive.exec.parallel =true;

USE ORCFILE

•Hive supports ORCfile , a new table storage format that sports fantastic speed improvements through techniques like predicate push-down, compression and more.

•Using ORCFile for every HIVE table is extremely beneficial to get fast response times for your HIVE queries.

USE TEZ

•With Hadoop2 and Tez , the cost of job submission and scheduling is minimized.

•Also Tez does not restrict the job to be only Map followed by Reduce; this implies that all the query execution can be done in a single job without having to cross job boundaries.

•Let’s look at an example. Consider a click-stream event table:

CREATE TABLE clicks (

timestamp date,

sessionID string,

url string,

source_ip string

)

STORED as ORC

tblproperties (“ orc.compress ” = “SNAPPY”);

•Each record represents a click event, and we would like to find the latest URL for each sessionID

• One might consider the following approach:

SELECT clicks.sessionID, clicks.url FROM clicks inner join (select sessionID, max(timestamp) as max_ts from clicks group by sessionID) latest ON clicks.sessionID = latest.sessionID and clicks.timestamp = latest.max_ts;

•In the above query, we build a sub-query to collect the timestamp of the latest event in each session, and then use an inner join to filter out the rest.

•While the query is a reasonable solution —from a functional point of view— it turns out there’s a better way to re-write this query as follows:

SELECT ranked_clicks.sessionID , ranked_clicks.url FROM (SELECT sessionID , url , RANK() over (partition by sessionID,order by timestamp desc ) as rank FROM clicks) ranked_clicks WHERE ranked_clicks.rank =1;

•Here, we use Hive’s OLAP functionality (OVER and RANK) to achieve the same thing, but without a Join.

•Clearly, removing an unnecessary join will almost always result in better performance, and when using big data this is more important than ever.

MAKING MULTIPLE PASS OVER SAME DATA

•Hive has a special syntax for producing multiple aggregations from a single pass through a source of data, rather than rescanning it for each aggregation.

•This change can save considerable processing time for large input data sets.

•For example, each of the following two queries creates a table from the same source table, history:

INSERT OVERWRITE TABLE sales

SELECT * FROM history WHERE action=‘purchased’;

INSERT OVERWRITE TABLE credits

SELECT * FROM history WHERE action=‘returned’;

Optimizations in Hive

•This syntax is correct, but inefficient.

•The following rewrite achieves the same thing, but using a single pass through the source history table:

FROM history

INSERT OVERWRITE sales SELECT * WHERE action=‘purchased’

INSERT OVERWRITE credits SELECT * WHERE action=‘returned’;