Thursday, October 9, 2014

Sqoop Import control arguments:


ArgumentDescription
--appendAppend data to an existing dataset in HDFS
--as-avrodatafileImports data to Avro Data Files
--as-sequencefileImports data to SequenceFiles
--as-textfileImports data as plain text (default)
--boundary-query <statement>Boundary query to use for creating splits
--columns <col,col,col…>Columns to import from table
--directUse direct import fast path
--direct-split-size <n>Split the input stream every n bytes when importing in direct mode
--inline-lob-limit <n>Set the maximum size for an inline LOB
-m,--num-mappers <n>Use n map tasks to import in parallel
-e,--query <statement>Import the results of statement.
--split-by <column-name>Column of the table used to split work units
--table <table-name>Table to read
--target-dir <dir>HDFS destination dir
--warehouse-dir <dir>HDFS parent for table destination
--where <where clause>WHERE clause to use during import
-z,--compressEnable compression
--compression-codec <c>Use Hadoop codec (default gzip)
--null-string <null-string>The string to be written for a null value for string columns
--null-non-string <null-string>The string to be written for a null value for non-string columns

The --null-string and --null-non-string arguments are optional.\ If not specified, then the string "null" will be used.

Tuesday, September 16, 2014

Apache Flume NG

Data ingesting tool - flume

Source - Web logs, HTTP, REST servers ,Avro, Thrift, Syslog, Netcat

Channels - Memory buffer or File or DB or Storage

Sink - Target / Destination

To import mutliple data sources to HDFS, Agents are used.

Each agent consists Source, Channel and Sink

-------------------------------------------------------------------------
Running Flume Command from Flume Terminal [as below]:
-------------------------------------------------------------------------
Flume > bin/flume-ng
               agent --conf ./conf/
              -f conf/flume-conf.conf
              -Dflume.root.logger=DEBUG, console -n agent1

-----------------------------------------------------
File content of flume-conf.conf [as below]:-
-----------------------------------------------------

agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1

# Define source and type of event (in this case it is exec)
agent1.sources.s1.type = exec
agent1.sources.s1.command = tail -f /tmp/esplog.log

# Define channel and type (in this case channel is stored in memory, other types are: file, database etc)
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

# Define sink and type (in this case it is hdfs, i.e an event type result is stored in hdfs)
# Default fileType outputs Writable (LongWritable) contents.
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path = hdfs://<IP>:<Port>/user/flume/esplog.log
agent1.sinks.k1.hdfs.fileType=DataStream

# Bind source and sink to a channel
agent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1

Sqoop EOD Imports based on script in Cron Jobs

#!/bin/ksh

today="$(date +'%d/%m/%Y')" >/dev/null 2>&1

sqoop import 
--connect "jdbc:mysql://localhost:1337/mysql_db" 
--query 'SELECT currentdate FROM mytable WHERE currentdate=$today
--target -dir /user/sqoop_op/

FAQs on Hadoop Bigdata

Q1: Which service is responsible for writing the data into datanodes, is it the Client that contacts each Datanode for writing the data or the Namenode?
Ans: Client seeks datanode report from NameNode, the list of Datanodes forms the pipeline. It copies data to nearest datanode through DataStreamer, and also maintains an internal queue called "data queue", the data is first streamed to nearest Datanode and it forwards the data to next Datanode through the pipeline.

Q2: If a Namenode crashes while data processing is in progress, after recovery will it be able to provide entire data or no data?
Ans: Depends on amount of data already written into fsimage in Secondary Namenode.

Q3:How do you achieve NameNode high availability?
Ans: YARN has a mechanism to configure multiple Namenodes.
The HDFS HA feature addresses the NN data problems by providing the option of running two NameNodes in the same cluster, in an Active/Passive configuration. These are referred to as the Active NameNode and the Standby NameNode. Unlike the Secondary NameNode, the Standby NameNode is hot standby, allowing a fast failover to a new NameNode in the case that a machine crashes, or a graceful administrator-initiated failover for the purpose of planned maintenance. You cannot have more than two NameNodes.

Q4: Where does sorting and shuffling occur in MapReduce? Mapper or Reducer?

Ans: Sorting happens in both Mapper and Reducer. Shuffling only implemented in Mapper 

Q5: What is secondary sort in MapReduce.
Ans: After processing all Map tasks and the intermediate data written to local FS in Key Value format and all keys are sorted format, but Values are not in sorted format. If Values related to the keys also required to be in sorted passion, then this sorting is called as 'Secondary sort'.

Q6: Will the size of input split depend on block size?
Ans: InputSplit can be Larger or Smaller than a block size.
    
size of input split = Max ( Min.SplitSize, Min (Max. Integer Value, DFS block size) )

Q7: The input file contains a one single line of 100MB stored in hdfs with default block size set to 64MB, how will you process it using MapReduce?
Ans: Your Mapreduce Jar file sit upon the i/p data blocks, first. In this case Jar file sits on 64MB block and 36MB file to start processing it. 36MB block finishes processing it earlier as it have less data and writes output in KV format to the Local File system that means Reducer location. Until all other blocks results in KV format were copied into the Reducer location, reducer method cannot be invoked. Once all KV format Intermediate results were written Reducer method invoked and writes output to HDFS again.

Q8: Can you redirect the output key values of mapper to a specific reducer?
Ans: YES, using custom partitions. 

Q9: What is structured data and unstructured data? A csv file , data copied from a Database are structured or unstructured data?
Ans: CSV is Structured, as it got structure while written data into Database, before.

Q10: How do you process a real time data generated at a constant rate and volume using Hadoop?

Ans:  Using Sqoop commands, we run cron jobs, once in a day, to Import data from source systems to Cluster. The no. of imports in a day should be lesser, is recommended. Analysis on newly appended Data [Per day] may give you Updated reports/results in the end

Q11: What will happen if you try to Load data into non-existing partition of a partitioned hive table?
Ans: Exception 
Q12:  What will happen if you try to Load data into non-existing bucket of a clustered hive table?
Ans: Exception 
Q13: What are different types of joins supported by hive?
Ans: Inner, LeftOuter, RightOuter, SemiJoin, MapJoins...etc 
Q14: The input log file contains group of lines for a give exception, how do implement the MapReduce such that each mapper processes the given exception and its group of lines that form the stack trace?
Ans: Implement Custom FileInputFormat and set IsSplittable value to 'False', Which does whole file as input to single mapper task. Here no. of mappers are only one.
 
Q15: The hive table contains:
UID   SessionID  MsgID Date
For a given data there can be multiple SessionIDs for e.g.:
123    Session123      abc123     26-Aug-2014
123    Session123      abc123     26-Aug-2014
123    Session124      abc123     26-Aug-2014
123    Session125      abc123     26-Aug-2014

How do you clean this data to eradicate the duplicates, such that each tuple is unique?
Ans: Make hive table partitioned based on SessionID column


Q16: A large log file contains different message types, each message should be parsed into different files, which of the following best suits this requirement?
1.) MapReduce job with number of Reduce tasks set to number of message types
2.) MapReduce job with enums defined for each message type.
3.) MapReduce job set with only Mapper tasks.
4.) MapReduce job set with Partitioner.
Ans. 4.


Q17: 1 GB input file is copied into HDFS , how many mappers can be invoked with default configurations on this file?
1.) 24 Mappers  2.) 10 Mappers    3.) 16  Mappers  4.) 1 Mapper
Ans. 16 Mappers, for default block size of 64 MB, the 1 GB file will be split into 16 * 64 MB (1024).

Q18: Which one is recommended for Hadoop  :
1.) Large files spread on many nodes.
2.) Many small files spread on multiple nodes.
3.) Archived small files (HAR) spread on mutiple nodes.
4.) 1 & 3
5.) None.
Ans. 4


Q19: Default Schedulers in Hadoop
1.) Fair Scheduler     2.)  FIFO Scheduler   3.) Capacity Scheduler  4.) None.
Ans. 2


Q20: Can a Single Node cluster have more than 1 replication factor set
1.) Yes  2.) No
Ans. Yes

Q21: Which Hadoop eco-system will you recommend for reading data from different sources into HDFS?
1.) Sqoop    2.)  Flume    3.) MapReduce  4.) None.
Ans. Flume, we can define various sources and relevant types such as exec, DB etc and redirect it to links as HDFS.


Q22: Which class will load data between Hive and HBase?
Ans: HBaseStorageHandler

Q23:
1. Is it possible to have more than one Mapper class defined for the MapReduce job?
2. Can we supply more than one input file for the MapReduce job?
Ans: Yes, to both questions, take a scenario wherein the MapReduce jar is processing multiple large log files, each having their own format and text. We should define different Mapper Classes that has the logic to produce Key Value pairs based on input log file.
 Also the driver class should use MultipleInputs class: 
MultipleInputs.addInputPath(job, filepath, InputFormat.class, Mapper.class); 

Q24. Why does my reducer shows x % of job started when the mapper is still in execution?
Ans: Ideally reducer can only start after mapper finishes, the % shown on the screen is amount of files it copied from the mapper but does not really mean the reducer operation.

Q25.What is SAFE MODE and what are the possible scenarios NameNode can get into SAFE MODE?
Ans. SAFE MODE is the sate wherein NameNode has not received prescribed number of block reports (99.9 % default) from the respective datanodes, possible scenarios are Datanode is corrupted, you added a new Datanode having configuration mismatch.

Q26: Is it mandatory to have SSH between Datanode and Namnode, though vice-versa is true though?
Ans. As per my experience the SSH is only needed by Hadoop to run the remote SSH commands from the master node to datanode (slaves), it may not be mandatory to have a reverse SSH between Slaves and Master.

Q27: Is the Secondary NameNode acts as a back up of NameNode?
Ans: No, Secondary Namenode (SNN) is only responsible for managing fsimage and edits file of NameNode, it doesn't act as a backup for NameNode (NN), in fact NN is a single point of failure in Gen 1 Hadoop, YARN though provides a way to configure multiple NameNodes.



Sunday, July 27, 2014

Working with Flume and Analyzing Flume data with HIVE

FLUME INSTALLATION:
==================

Download latest FLUME tar ball from: 


Update the HADOOP_CLASSPATH in $HADOOP_HOME/hadoop-env.sh with flume-ng-core-1.5.0.jar file:
root@centos6:~ #grep -i HADOOP_CLASSPATH /home/bigdata/hadoop-1.2.1/conf/hadoop-env.sh 
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=/home/bigdata/apache-hive-0.13.1-bin/lib/*:/home/bigdata/apache-flume-1.5.0-bin/lib/flume-ng-core-1.5.0.jar:/home/bigdata/hadoop-1.2.1/hadoop-core-1.2.1.jar:/home/bigdata/hadoop-1.2.1/HadoopProj                         
root@centos6:~ #

Update /etc/profile with:
export FLUME_HOME=/home/bigdata/apache-flume-1.5.0-bin
export PATH=$HIVE_HOME/bin:$HADOOP_HOME/bin:$HBASE_HOME/bin:$FLUME_HOME/bin:$JAVA_HOME/bin:$PATH

SETUP CONFIGURATION FILE:
========================
Create configuration file $FLUME_HOME/conf 
The following conf file listens for file events, other events could be network etc, for list of events (source types) visit https://cwiki.apache.org/confluence/display/FLUME/Getting+Started :

root@centos6:~/apache-flume-1.5.0-bin #pwd
/home/bigdata/apache-flume-1.5.0-bin
root@databliz-centos6:~/apache-flume-1.5.0-bin #cat conf/flume-conf.conf 
agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1

# Define source and type of event (in this case it is exec)
agent1.sources.s1.type = exec
agent1.sources.s1.command = tail -f /tmp/esplog.log

# Define channel and type (in this case channel is stored in memory, other types are: file, database etc)
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

# Define sink and type (in this case it is hdfs, i.e an event type result is stored in hdfs)
# Default fileType outputs Writable (LongWritable) contents.
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path = hdfs://<IP>:<Port>/user/flume/esplog.log
agent1.sinks.k1.hdfs.fileType=DataStream

# Bind source and sink to a channel
agent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1

root@centos6:~/apache-flume-1.5.0-bin #

RUN THE FLUME:
==============
root@centos6:~/apache-flume-1.5.0-bin #bin/flume-ng agent --conf ./conf --conf-file conf/flume-conf.conf --name agent1 -Dflume.root.logger=INFO,console

We can also use the debug option:-Dflume.root.logger=DEBUG,console

Each time any new entry is added to /tmp/esplog.log flume generates a file in hdfs://<IP>:<Port>/user/flume/esplog.log/

Example: 
root@centos6:~ #hadoop dfs -cat /user/flume/esplog.log/FlumeData.1406475505975
Warning: $HADOOP_HOME is deprecated.

spi messages
spi messages
conn messages
conn messages
SA messages
ISKAMP messages
conn5007 messages
conn5008 messages
root@databliz-centos6:~ #

ANALYZE FLUME DATA WITH hive:
=============================
hive> create table flumedata (col1 string, col2 string)
    > row format delimited fields terminated by ' ';
OK
Time taken: 0.935 seconds

hive> load data inpath '/user/flume/esplog.log/FlumeData.1406475505975' into table flumedata;

hive> select * from flumedata;
OK
flumedata.col1  flumedata.col2
spi     messages
spi     messages
conn    messages
conn    messages
SA      messages
ISKAMP  messages
conn5007        messages
conn5008        messages
Time taken: 0.521 seconds, Fetched: 8 row(s)
hive> 

Monday, July 7, 2014

Map-Reduce implementation Detailed

Introduction:
This document describes the map and reduce in hadoop how specific operations completed. If you Google 's MapReduce are not familiar with a variety of modes, 
please refer to the MapReduce -. http://labs.google COM . / papers / mapreduce html
Map
Since Map is parallel to the input file set to operate, so it's the first step (FileSplit) is set to split the file into several subsets, if a single file as large as it has affected the search efficiency, it will be divided into small bodies.It is noted that this step is split does not know the internal logical structure of the input file, for example, to conduct a logical division of the text file will be divided in an arbitrary byte boundaries, so this particular division to themselves to specify also may have been used hadoop define a few simple segmentation. Each file is then divided body would correspond to a new map task.
When a single map task starts, it will reduce over each configuration task to open a new output Writer (writer). Followed it (writer) will be used from the specified specific In the resulting putFormat in Record Reader to read it file divided body. InputFormat class analysis of the input file and generates key-value key-value pairs. while InputFormat boundary will need to be addressed at the time of recording to file segmentation. For example TextInputFormat reads file segmentation boundaries have split the last line of the body, if the body is read when the split is not the first time, TextInputFormat ignores the first line of the content . 
InputFormat class does not need to produce some of the keys to a meaningful right. For example, the default output is TextInputFormat class line of input text content value, row offset key - the majority of applications only use but rarely used offset.
Passed to the user to configure the mapper keys are read from RecordReader, the user provides Mapper class can be any of the keys on the operation and then call OutputCollector.collect method to re-gather after their own definition of the key pair. The output must be generated by a Key class and a value class, this is because the output is to be written to disk Map to SequenceFile the form, which includes information for each file type and all the records are of similar shape ( If you want to output different data structures you can inherit subcategories out). Map key input and output need not linked to the type.
When the output Mapper is collected, they will be Partiti One r class distinction in the manner specified written to the output file. The default is HashPartitionerclass with the hashcode hash function key generated class to distinguish (and therefore should have a good hash function, it can make in the various reduce the load evenly balanced task). Details can be viewed MapTask class. N inputs can generate M map tasks to run, each map task will reduce the number of tasks configured to generate output files. Each output file will be for a specific task simultaneously reduce all keys generated from the map tasks will be to reduce on the inside. So in a particular reduce task for a given key value pairs will all be processed.
Combine
When the output of its key map operation has been in existence for them in memory. For performance and efficiency considerations have sometimes reduce function of the synthesizer is good. If there is a synthesizer, then map the keys will not be written immediately to the output, they will be collected in the list, a key value of a list, when writing a certain number of key-value pairs, which is part of the buffer will be sent to the synthesizer, all the value of each key will be sent to all the synth reduce method as in the original key and the output of the same map.
For example, hadoop case the word count program, its output is a map operation (word, 1) key-value pairs in the input word count can be used to speed up this operation synthesizer. Collection and processing of a synthetic operation lists in memory, a word of a list. When a certain number of key-value pairs output into memory, it calls reduce synthesized operations, each with a unique word for key, values​​is a list iterator. Then synthesizer output (word, count-in-this-part-of-the-input) key pairs. Reduce operating from the point of view Map synthesizer output also has the same information, but this will reduce much more than the original hard disk read and write.
Reduce
When a reduce task starts, its input is dispersed on each node of the map output file. If, in a distributed model, they need to be copied to the local file copy step in the system on. Details can be viewed ReduceTaskRunner class
Once all the data are in the local effective, it will add an added step in the document. Then the file will be merged classification so that the same key value pairs can row together (classification step). This allows true reduce operation simple, the file will be read sequentially into the value (values) from the input file passed to reduce an iterator method - until the next key. Details can be viewed ReduceTask class.
Finally, the output consists of output files for each reduce task. Their surface can be specified by JobConf.setOutputFormat class format, if used JobConf.setOutputFormat class, then the output of the key classes and value classes must specify.

Wednesday, July 2, 2014

How does Hadoop developers submit their Mapreduce jobs to Cluster?


Using Jumbune tool a Hadoop developer can do below jobs:

  1. MR Job Profiling 
  2. HDFS Data Validator 
  3. MR Job flow Debugger


Using Jumbune tool a Hadoop Admin can do below jobs:

  1. Hadoop cluster Monitoring
  2. MR Job profiling

It is really easy learn about these jobs if you are familiar with Mapreduce programming and Job submition to Hadoop cluster before.

I have found videos in Youtube with below links here, very interesting to learn and apply.

Developer Videos:

MapReduce Job Profiling - Developer
https://www.youtube.com/watch?v=vkiiEnJDMi0

Jumbune HDFS Validation - Developer
https://www.youtube.com/watch?v=1CXjfNBSL_s

DataValidator  - Developer
https://www.youtube.com/watch?v=kBIP4BVIjRQ

Jumbune MapReduce Flow Debugger - Developer
https://www.youtube.com/watch?v=3T4xWtIiS_Q

Admin Videos:

Jumbune Cluster Monitoring - Administrator
https://www.youtube.com/watch?v=Q8ToDsGRBIE

Jumbune MapReduce Profiler - Developer, Administrator
https://www.youtube.com/watch?v=BRt2sHu5804