Saturday, May 31, 2014

5 Tips for efficient Hive queries

Hive on Hadoop makes data processing so straightforward and scalable that we can easily forget to optimize our Hive queries. Well designed tables and queries can greatly improve your query speed and reduce processing cost. This article includes five tips, which are valuable for ad-hoc queries, to save time, as much as for regular ETL (Extract, Transform, Load) workloads, to save money. The three areas in which we can optimize our Hive utilization are:5_tips_efficient
  • Data Layout (Partitions and Buckets)
  • Data Sampling (Bucket and Block sampling)
  • Data Processing (Bucket Map Join and Parallel execution)

We will discuss these areas in detail in this article, you can if like, also watch our webinar on the topic given by Ashish Thusoo, co-founder of Apache Hive, and Sadiq Sid Shaik, Director of Product at Qubole. Example Data Set We can illustrate the improvements best on an example data set we use at Qubole. The data consists of three tables. The table Airline Bookings All contains 276 million records of complete air travel trips from an origin to a destination with an itinerary identifier as key. The second table Airline Bookings Origin Only contains the data for the first leg of an itinerary only and also has the itinerary’s identifier as a key. The last table is ‘Census’ containing population information for each US state.
airline-booking
The example data set to demonstrate Hive optimization
Tip 1: Partitioning Hive Tables Hive is a powerful tool to perform queries on large data sets and it is particularly good at queries that require full table scans. Yet many queries run on Hive have filtering where clauses limiting the data to be retrieved and processed, e.g. SELECT * WHERE state=’CA’. Hive users tend to have or develop a domain knowledge, understand the data they work with and the queries commonly executed or scheduled. With this knowledge we can identify common data structures that surface in queries. This enables us to identify columns with a (relatively) low cardinality like geographies or dates and high relevance to key queries. For example, common approaches to slice the airline data may be by origin state for reporting purposes. We can utilize this knowledge to organise our data by this information and tell Hive about it. Hive can utilize this knowledge to exclude data from queries before even reading it. Hive tables are linked to directories on HDFS or S3 with files in them interpreted by the meta data stored with Hive. Without partitioning Hive reads all the data in the directory and applies the query filters on it. This is slow and expensive since all data has to be read. In our example a common reports and queries might be generated on an origin state basis. This enables us to define at creation time of the table the state column to be a partition. Consequently, when we write data to the table the data will be written in sub-directories named by state (abbreviations). Subsequently, queries filtering by origin state, e.g. SELECT * FROM Airline_Bookings_All WHERE origin_state = ‘CA’, allow Hive to skip all but the relevant sub-directories and data files. This can lead to tremendous reduction in data required to read and filter in the initial map stage. This reduces the number of mappers, IO operations, and time to answer the query.
patritioned
Example Hive table partitioning It is important to consider the cardinality of a potential partition column and avoid fragmenting the data too much. Itinerary ID would be a very poor choice for partitioning. Queries for single itineraries by ID would be very fast but any other query would require to parse a huge amount of directories and files incurring serious overheads. Additionally, HDFS uses a very large block size of usually 64 MB or more which means that each file, even with only a few bytes of data, will have to allocate that block size on HDFS. This can potentially fill the file system up with large number of files carrying barely any actual data.
Tip 2: Bucketing Hive Tables Itinerary ID is unsuitable for partitioning as we learned but it is used frequently for join operations. We can optimize joins by bucketing ‘similar’ IDs so Hive can minimise the processing steps, and reduce the data needed to parse and compare for join operations. Itinerary IDs, of course, have no real similarity and we only need to achieve that the same itinerary IDs from two tables end up in the same processing bucket. A simple trick to do this is to hash the data and store it by hash results, which is what bucketing does.
31
Example Hive table bucketing Bucketing requires us to tell Hive at table creation time by which column to cluster by and into how many buckets. We also have to ensure the bucketing flag is set (SET hive.enforce.bucketing=true;) every time before we write data to the bucketed table. Importantly, the corresponding tables we want to join on have to be set up in the same manner with the joining columns bucketed and the bucket sizes being multiples of each other to work. The second part is the optimized query for which we have to set a flag to hint to Hive that we want to take advantage of the bucketing in the join (SET hive.optimize.bucketmapjoin=true;). The SELECT statement then can include a MAPJOIN statement to ensure that the join operation is executed at the map stage by combining only the few relevant files in each mapper task in a distributed fashion from the two tables instead of parsing the full tables.   Example Hive MAPJOIN with bucketing
bucketmap
Tip 3: Bucket Sampling Once our tables are setup with these buckets we can address another important use-case. We often want to query large table joins for a sample. We may want to try out complex queries or explore the data, and we want to do this iteratively, swiftly, and not process the whole data set. This is particularly difficult because of the joining of the tables since only very little data may overlap on independent samples from two tables. Ideally we would want to sample the relevant data on both tables and join it, i.e. ensure that we sample the same itinerary IDs from both tables and not sets with no or little overlap. The bucketing on the join column enables us to join specific buckets from two tables with data overlapping on the join column. Effectively, we execute exactly one part of the complete join operation and only incur the cost of it. The hashing function on the ID has the additional benefit of a (somewhat) random nature providing a representative sample.
table-sample
Example Hive TABLESAMPLE on bucketed tables
Tip 4: Block Sampling Similarly, to the previous tip, we often want to sample data from only one table to explore queries and data. In these cases we may not want to go through bucketing the table or we have the need to sample the data more randomly (independent from the hashing of a bucketing column) or at decreasing granularity. Block sampling provides a powerful syntax to define various ways of sampling the data in a table with the TABLESAMPLE statement. We can use it to sample a certain percentage, number of bytes, or rows of the data. We can use the sampling to approximate information like average distance between origin and destination of our itineraries. A query using 1% of the data using TABLESAMPLE(1 PERCENT) on a large table will give us a near perfect answer and use up to only a hundredth of the resource and return the result one to two magnitudes faster. In exploratory work or for metrics this approach can be extremely efficient and effective alternative to processing all of the data. The beauty of this solution is that we can scale the sample size with our data size. If we were to explore at Tera- or Petabytes of data we could sample a fraction of percent and get the same actionable information in minutes or less which would otherwise take hours to receive.
Tip 5: Parallel Execution Hadoop can execute map reduce jobs in parallel and several queries executed on Hive make automatically use of this parallelism. However, single, complex Hive queries commonly are translated to a number of map reduce jobs that are executed by default sequentially. Often though some of a query’s map reduce stages are not interdependent and could be executed in parallel. They then can take advantage of spare capacity on a cluster and improve cluster utilization while at the same time reduce the overall query executions time. The configuration in Hive to change this behaviour is a merely switching a single flag SET hive.exce.parallel=true;.
parallal
Example of Hive parallel stage execution of a query In our example in the image above we can see that the two sub-queries are independent and when we enable parallel execution are processed at the same time. In our example this reduced the execution time by 50%! Conclusion The five presented tips in this article can easily be applied by anyone using Hive to improve processing and query speed and reduce resource consumption. You can, of course, use them with the Qubole platformand are welcome to try it out.
- See more at: http://www.qubole.com/5-tips-for-efficient-hive-queries/#sthash.njFuPSzj.dpuf

Screencast for submitting a job to a cluster

Here is a screencast on submitting a word count MapReduce program written in Python. For some reason Windows Media Player is not able to play the file, but VLC is able to.


Here is the code for the mapper (1) and the reducer (1). Note that Hadoop provides Streaming (1) feature for writing MapReduce programs in non Java languages.
https://dl.dropboxusercontent.com/u/3182023/Screencasts/Execute-WordCount-PythonMR-In-VirtualMachine.mp4
As observed in the screencast the Virtual Machine (VM) has all the necessary pieces to get easily started with Big Data and is provided as part of the Big Data training. The VM is updated regularly to add new frameworks and update the existing ones.

Hadoop tries to hide the underlying infrastructure details, so the MapReduce code and the command to submit a job is all the same for a single node and a thousand node cluster.

Tuesday, May 20, 2014

24 Interview Questions & Answers for Hadoop MapReduce developers

             
                          A good understanding of Hadoop Architecture is required to understand and leverage the power of Hadoop. Here are few important practical questions which can be asked to a Senior Experienced Hadoop Developer in an interview. This list primarily includes questions related to Hadoop Architecture, MapReduce, Hadoop API and Hadoop Distributed File System (HDFS).




What is a JobTracker in Hadoop? How many instances of JobTracker run on a Hadoop Cluster?
JobTracker is the daemon service for submitting and tracking MapReduce jobs in Hadoop. There is only One Job Tracker process run on any hadoop cluster. Job Tracker runs on its own JVM process. In a typical production cluster its run on a separate machine. Each slave node is configured with job tracker node location. The JobTracker is single point of failure for the Hadoop MapReduce service. If it goes down, all running jobs are halted. JobTracker in Hadoop performs following actions(from Hadoop Wiki:)
  • Client applications submit jobs to the Job tracker.
  • The JobTracker talks to the NameNode to determine the location of the data
  • The JobTracker locates TaskTracker nodes with available slots at or near the data
  • The JobTracker submits the work to the chosen TaskTracker nodes.
  • The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are deemed to have failed and the work is scheduled on a different TaskTracker.
  • A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may may even blacklist the TaskTracker as unreliable.
  • When the work is completed, the JobTracker updates its status.
  • Client applications can poll the JobTracker for information.

How JobTracker schedules a task?

The TaskTrackers send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated. When the JobTracker tries to find somewhere to schedule a task within the MapReduce operations, it first looks for an empty slot on the same server that hosts the DataNode containing the data, and if not, it looks for an empty slot on a machine in the same rack.

What is a Task Tracker in Hadoop? How many instances of TaskTracker run on a Hadoop Cluster

A TaskTracker is a slave node daemon in the cluster that accepts tasks (Map, Reduce and Shuffle operations) from a JobTracker. There is only One Task Tracker process run on any hadoop slave node. Task Tracker runs on its own JVM process. Every TaskTracker is configured with a set of slots, these indicate the number of tasks that it can accept. The TaskTracker starts a separate JVM processes to do the actual work (called as Task Instance) this is to ensure that process failure does not take down the task tracker. The TaskTracker monitors these task instances, capturing the output and exit codes. When the Task instances finish, successfully or not, the task tracker notifies the JobTracker. The TaskTrackers also send out heartbeat messages to the JobTracker, usually every few minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the number of available slots, so the JobTracker can stay up to date with where in the cluster work can be delegated.

What is a Task instance in Hadoop? Where does it run?

Task instances are the actual MapReduce jobs which are run on each slave node. The TaskTracker starts a separate JVM processes to do the actual work (called as Task Instance) this is to ensure that process failure does not take down the task tracker. Each Task Instance runs on its own JVM process. There can be multiple processes of task instance running on a slave node. This is based on the number of slots configured on task tracker. By default a new task instance JVM process is spawned for a task.

How many Daemon processes run on a Hadoop system?

 

Hadoop is comprised of five separate daemons. Each of these daemon run in its own JVM. Following 3 Daemons run on Master nodes NameNode - This daemon stores and maintains the metadata for HDFS. Secondary NameNode - Performs housekeeping functions for the NameNode. JobTracker - Manages MapReduce jobs, distributes individual tasks to machines running the Task Tracker.Following 2 Daemons run on each Slave nodes DataNode – Stores actual HDFS data blocks. TaskTracker - Responsible for instantiating and monitoring individual Map and Reduce tasks.

What is configuration of a typical slave node on Hadoop cluster? How many JVMs run on a slave node?

  • Single instance of a Task Tracker is run on each Slave node. Task tracker is run as a separate JVM process.
  • Single instance of a DataNode daemon is run on each Slave node. DataNode daemon is run as a separate JVM process.
  • One or Multiple instances of Task Instance is run on each slave node. Each task instance is run as a separate JVM process. The number of Task instances can be controlled by configuration. Typically a high end machine is configured to run more task instances.

What is the difference between HDFS and NAS ?

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. Following are differences between HDFS and NAS
  • In HDFS Data Blocks are distributed across local drives of all machines in a cluster. Whereas in NAS data is stored on dedicated hardware.
  • HDFS is designed to work with MapReduce System, since computation are moved to data. NAS is not suitable for MapReduce since data is stored seperately from the computations.
  • HDFS runs on a cluster of machines and provides redundancy using replication protocal. Whereas NAS is provided by a single machine therefore does not provide data redundancy.
               

NameNode periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode. When NameNode notices that it has not recieved a hearbeat message from a data node after a certain amount of time, the data node is marked as dead. Since blocks will be under replicated the system begins replicating the blocks that were stored on the dead datanode. The NameNode Orchestrates the replication of data blocks from one datanode to another. The replication data transfer happens directly between datanodes and the data never passes through the namenode.

Does MapReduce programming model provide a way for reducers to communicate with each other? In a MapReduce job can a reducer communicate with another reducer?

Nope, MapReduce programming model does not allow reducers to communicate with each other. Reducers run in isolation.

Can I set the number of reducers to zero?

Yes, Setting the number of reducers to zero is a valid configuration in Hadoop. When you set the reducers to zero no reducers will be executed, and the output of each mapper will be stored to a separate file on HDFS. [This is different from the condition when reducers are set to a number greater than zero and the Mappers output (intermediate data) is written to the Local file system(NOT HDFS) of each mappter slave node.]
Note: If you didn't configured any Reducer class in driver class, then Identity Reducer takes place and consider as one reducer internally and output writes to a HDFS single file. If you desired to write into multiple HDFS file outputs, you have to set number of reducers to zero. 

Where is the Mapper Output (intermediate kay-value data) stored ?

The mapper output (intermediate data) is stored on the Local file system (NOT HDFS) of each individual mapper nodes. This is typically a temporary directory location which can be setup in config by the hadoop administrator. The intermediate data is cleaned up after the Hadoop Job completes.

What are combiners? When should I use a combiner in my MapReduce Job?

          
Combiners are used to increase the efficiency of a MapReduce program. They are used to aggregate intermediate map output locally on individual mapper outputs. Combiners can help you reduce the amount of data that needs to be transferred across to the reducers. You can use your reducer code as a combiner if the operation performed is commutative and associative. The execution of combiner is not guaranteed, Hadoop may or may not execute a combiner. Also, if required it may execute it more then 1 times. Therefore your MapReduce jobs should not depend on the combiners execution.

What is Writable & WritableComparable interface?

  • org.apache.hadoop.io.Writable is a Java interface. Any key or value type in the Hadoop Map-Reduce framework implements this interface. Implementations typically implement a static read(DataInput) method which constructs a new instance, calls readFields(DataInput) and returns the instance.
  • org.apache.hadoop.io.WritableComparable is a Java interface. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface. WritableComparable objects can be compared to each other using Comparators.

What is the Hadoop MapReduce API contract for a key and value Class?

  • The Key must implement the org.apache.hadoop.io.WritableComparable interface.
  • The value must implement the org.apache.hadoop.io.Writable interface.

What is a IdentityMapper and IdentityReducer in MapReduce ?

  • org.apache.hadoop.mapred.lib.IdentityMapper Implements the identity function, mapping inputs directly to outputs. If MapReduce programmer do not set the Mapper Class using JobConf.setMapperClass then IdentityMapper.class is used as a default value.
  • org.apache.hadoop.mapred.lib.IdentityReducer Performs no reduction, writing all input values directly to the output. If MapReduce programmer do not set the Reducer Class using JobConf.setReducerClass then IdentityReducer.class is used as a default value.

What is the meaning of speculative execution in Hadoop? Why is it important?

Speculative execution is a way of coping with individual Machine performance. In large clusters where hundreds or thousands of machines are involved there may be machines which are not performing as fast as others. This may result in delays in a full job due to only one machine not performaing well. To avoid this, speculative execution in hadoop can run multiple copies of same map or reduce task on different slave nodes. The results from first node to finish are used.

When is the reducers are started in a MapReduce job?

In a MapReduce job reducers do not start executing the reduce method until the all Map jobs have completed. Reducers start copying intermediate key-value pairs from the mappers as soon as they are available. The programmer defined reduce method is called only after all the mappers have finished.

If reducers do not start before all mappers finish then why does the progress on MapReduce job shows something like Map(50%) Reduce(10%)? Why reducers progress percentage is displayed when mapper is not finished yet?

Reducers start copying intermediate key-value pairs from the mappers as soon as they are available. The progress calculation also takes in account the processing of data transfer which is done by reduce process, therefore the reduce progress starts showing up as soon as any intermediate key-value pair for a mapper is available to be transferred to reducer. Though the reducer progress is updated still the programmer defined reduce method is called only after all the mappers have finished.

What is HDFS ? How it is different from traditional file systems?

HDFS, the Hadoop Distributed File System, is responsible for storing huge data on the cluster. This is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant.
  • HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware.
  • HDFS provides high throughput access to application data and is suitable for applications that have large data sets.
  • HDFS is designed to support very large files. Applications that are compatible with HDFS are those that deal with large data sets. These applications write their data only once but they read it one or more times and require these reads to be satisfied at streaming speeds. HDFS supports write-once-read-many semantics on files.

What is HDFS Block size? How is it different from traditional file system block size?


In HDFS data is split into blocks and distributed across multiple nodes in the cluster. Each block is typically 64Mb or 128Mb in size. Each block is replicated multiple times. Default is to replicate each block three times. Replicas are stored on different nodes. HDFS utilizes the local file system to store each HDFS block as a separate file. HDFS Block size can not be compared with the traditional file system block size.

What is a NameNode? How many instances of NameNode run on a Hadoop Cluster?

The NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all files in the file system, and tracks where across the cluster the file data is kept. It does not store the data of these files itself. There is only One NameNode process run on any hadoop cluster. NameNode runs on its own JVM process. In a typical production cluster its run on a separate machine. The NameNode is a Single Point of Failure for the HDFS Cluster. When the NameNode goes down, the file system goes offline. Client applications talk to the NameNode whenever they wish to locate a file, or when they want to add/copy/move/delete a file. The NameNode responds the successful requests by returning a list of relevant DataNode servers where the data lives.

What is a DataNode? How many instances of DataNode run on a Hadoop Cluster?

A DataNode stores data in the Hadoop File System HDFS. There is only One DataNode process run on any hadoop slave node. DataNode runs on its own JVM process. On startup, a DataNode connects to the NameNode. DataNode instances can talk to each other, this is mostly during replicating data.

How the Client communicates with HDFS?

The Client communication to HDFS happens using Hadoop HDFS API. Client applications talk to the NameNode whenever they wish to locate a file, or when they want to add/copy/move/delete a file on HDFS. The NameNode responds the successful requests by returning a list of relevant DataNode servers where the data lives. Client applications can talk directly to a DataNode, once the NameNode has provided the location of the data.

How the HDFS Blocks are replicated?

HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time. The NameNode makes all decisions regarding replication of blocks. HDFS uses rack-aware replica placement policy. In default configuration there are total 3 copies of a datablock on HDFS, 2 copies are stored on datanodes on same rack and 3rd copy on a different rack.

source: http://www.fromdev.com/2010/12/interview-questions-hadoop-mapreduce.html



MapReduce Fundamentals

MapReduce is a programming model for efficient distributed computing.
The core concept of MapReduce in Hadoop is that input may be split into logical chunks, and each chunk may be initially processed independently, by a map task. The results of these individual processing chunks can be physically partitioned into distinct sets, which are then sorted. Each sorted chunk is passed to a reduce task.
Every MapReduce program must specify a Mapper and typically a Reducer.

source: http://www.rishavrohitblog.blogspot.in/2013/03/mapreduce-fundamentals.html



INPUT AND OUTPUT FORMATS

A MapReduce may specify how it’s input is to be read by specifying an InputFormat to be used
– InputSplit
– RecordReader
A MapReduce may specify how it’s output is to be written by specifying an OutputFormat to be used.
These default to TextInputFormat and TextOutputFormat, which process line-based text data.
SequenceFile: SequenceFileInputFormat and SequenceFileOutputFormat.
These are file-based, but they are not required to be.

HOW MANY MAPS AND REDUCES

Maps
– Usually as many as the number of HDFS blocks being processed, this is the default.
– Else the number of maps can be specified as a hint.
– The number of maps can also be controlled by specifying the minimum split size.
– The actual sizes of the map inputs are computed by:
 max(min(block_size, data/#maps), min_split_size)
Reducers
– Unless the amount of data being processed is small
 0.95*num_nodes*mapred.tasktracker.reduce.tasks.maximum

Saturday, May 17, 2014

Who can learn hadoop and what are the min. skills required to learn Hadoop?


Software Professionals, Analytic Professionals, ETL developers, Project Managers, Testing Professionals are the key beneficiaries of Hadoop. 

Freshers who are looking forward to acquire a solid foundation of Hadoop Architecture can also opt for training, in hadoop. First learn and train yourself with Java and any RDBMS skills, before obtaining a training on Hadoop.

Here we go, the Hadoop Job types with their responsibilities and Skills, in their Hadoop career.


Hadoop Job Type        Job functionsSkills
Hadoop DeveloperDevelops MapReduce jobs, designs data warehousesLinux, Java, Scripting
Hadoop AdminManages Hadoop cluster, designs data pipelinesNetwork Management, Linux administration, Experience in managing large cluster of machines
Data ScientistData Mining and figuring out hidden knowledge in dataMath, data mining algorithms
Business AnalystAnalyzes data!HiveQL, Pig, familiarity with other BI tools