This blog is mainly focused on for discussions in Hadoop technology, using various tools from Hadoop ECO system. Hadoop experts and beginners post or share their views, experiences. Freshers on Hadoop post their questions here to clarify from experts. I motivated myself to create this blog for helping the new beginners from expensive Hadoop projects in the market. I do my best to collect and share genuine posts from hadoop discussions around world of internet too.
Saturday, June 21, 2014
Monday, June 16, 2014
Sunday, June 15, 2014
Map side join using Distributed cache in Hadoop
You want to populate an associative array in order to perform a map-side join. You’ve decided to put this information in a text file, place that file into the DistributedCache and read it in your Mapper before any records are processed. Indentify which method in the Mapper you should use to implement code for reading the file and populating the associative array?
Answer is, Configure method used inside Mapper method.
Explanation:
See 3) below. Here is an illustrative example on how to use the DistributedCache:
// Setting up the cache for the application
1. Copy the requisite files to the FileSystem:
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. Setup the application's JobConf:
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. Use the cached files in theMapper orReducer: public static class MapClass
extends MapReduceBase implements Mapper {
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
localArchives = DistributedCache.getLocalCacheArchives(job);
localFiles = DistributedCache.getLocalCacheFiles(job);
}
public void map(K key, V value, OutputCollector output, Reporter reporter)
throws IOException {
// Use data from the cached archives/files here
// ...
// ... output.collect(k, v);
}
}
Answer is, Configure method used inside Mapper method.
Explanation:
See 3) below. Here is an illustrative example on how to use the DistributedCache:
// Setting up the cache for the application
1. Copy the requisite files to the FileSystem:
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. Setup the application's JobConf:
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. Use the cached files in theMapper orReducer: public static class MapClass
extends MapReduceBase implements Mapper
private Path[] localArchives;
private Path[] localFiles;
public void configure(JobConf job) {
// Get the cached archives/files
localArchives = DistributedCache.getLocalCacheArchives(job);
localFiles = DistributedCache.getLocalCacheFiles(job);
}
public void map(K key, V value, OutputCollector
throws IOException {
// Use data from the cached archives/files here
// ...
// ... output.collect(k, v);
}
}
Friday, June 6, 2014
Useful SQOOP tutorials
Apache Sqoop Tutorial -1
Apache Sqoop Tutorial -2
In Sqoop, by default 4 mappers runs for 1000 Records at maximum, to change this settins u need mention
> --m 1
with sqoop command to set it to 1 mapper.
Here i am trying to write few sqoop commands in the below:
1. Importing RDBMS Table to Target Directory:
--------------------------------
>sqoop import --connect jdbc:localhost://mysql/sqoopdb --username sqoop -P\
> --table emplyee
> --target-dir <not existing folder path>
> --where "col between 1 and 100"
> -m 1
2.Importing RDBMS Table to Warehouse Directory:
-----------------------------------
>sqoop import --connect jdbc:localhost://mysql/sqoopdb --username sqoop -P\
> --table emplyee
> --warehouse-dir <Existing warehouse folder path>
> --where "col between 1 and 100"
> -m 1
3.Incremental Import:
--------------------
>sqoop import --connect jdbc:localhost://mysql/sqoopdb --username sqoop -P\
> --table emplyee \
> --warehouse-dir <Existing warehouse folder path>
> --check-colum emp_id \
> --incremental append \
> --last-value 9 \
> -m 1
4.Options file:
--------------------
sqoop> nano Import.txt
import
--connect
jdbc:localhost://mysql/sqoopdb
--username
sqoop
-P
[Note:Using above Import.txt in sqoop commands as below, and use options file flag to use above file as common parameters]
sqoop>sqoop --options-file Import.txt \
> --table emplyee \
> --warehouse-dir <Existing warehouse folder path>
> --where "col between 1 and 100"
> -m 1
Thursday, June 5, 2014
Must-have skills to make it big in Big Data
Problem solving and communications skills with open mindedness are critical skills for the Big Data industry.
The biggest constraint for the Big Data industry today is that there are not enough people who understand Big Data, work on it, manage it and get meaningful results out of it.
The industry requirement is for a wide variety of roles. Some of these are purely data-oriented, some others purely statistics oriented and then there are roles which integrate all these and connect it to business.
To reduce the skill gap, several organisations are collaborating with academic partners to develop curriculum that reflects the mix of technical and problem-solving skills.
So what skills should someone aiming to enter this industry prepare themselves for?
Typically, Big Data teams in an organisation can basically have six kinds of roles, viz:
- The architects
- The domain experts
- The subject matter experts
- The analysts, who will bring in the data analysis and statistical analysis experience
- The technical administrators, who know how to manage large sets of data
- The applications specialist
“The thinking process and expectations from each of these roles is different,” says Srikanth Muralidhara, Co-founder Flutura Solutions. “Big Data opportunities are basically transforming three things: First, creating a new data driven culture, where decisions are based on data as well as intuition. Second, transforming businesses because now we know that we can understand customers better and thereby can design products and services better, and the third is we optimise technology investments.”
Of these, the Data Analyst and the Data Scientist are currently much in demand in India.
Thinking abilities for a Big Data Analyst include:
- Ability to look at data and think on the lines of why is this happening?
- What will happen if this trend continues?
- What will happen next (which is about predictive modeling)?
- What is the best that can happen (the optimization aspect)?
“So, when we say we are bringing in a Big Data Analyst it should be someone who is comfortable with these kind of questions,’’ says Srinivasan Govindaraj senior director and practice head – Enterprise Information Management (EIM).
The abilities necessary for a Data Scientist:
- How to predict behavior and derive customer sentiments?
- How to process unstructured data?
- How to correlate different sets of data?
- How to identify new patterns in untapped data?
According to industry players a good Data Scientist must take the time to learn:
- What problems need to be solved?
- How the solution will deliver value?
- How it will be used and by whom?
“In short, it is problem solving and communication skills with open mindedness,’’ says Muralidhara. “The last skill is particularly significant because a Data Scientist often applies his or her knowledge to multiple industries such as banking, health care and retail.’’
How to write output to multiple named files in Hadoop using MultipleTextOutputFormat
Sometime we want our Map Reduce job to output data in named files.
For e.g.
Suppose you have input file that contains the following data
Name:Dash
Age:27
Name:Nish
Age:29
This can be done in Hadoop by using MultipleTextOutputFormat class. The following is a simple example implementation of MultipleTextOutputFormat class which will read the file above and create 2 output files Name and Age
The code where the action happens is highlighted in blue
package org.myorg;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
public class mult{
static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
For e.g.
Suppose you have input file that contains the following data
Name:Dash
Age:27
Name:Nish
Age:29
This can be done in Hadoop by using MultipleTextOutputFormat class. The following is a simple example implementation of MultipleTextOutputFormat class which will read the file above and create 2 output files Name and Age
The code where the action happens is highlighted in blue
package org.myorg;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;
public class mult{
static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
String [] dall=value.toString().split(":");
output.collect(new Text(dall[0]),new Text(dall[1]));
}
}static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException
public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
while (values.hasNext())
while (values.hasNext())
{
output.collect(key, values.next());
}
}
}
static class MultiFileOutput extends MultipleTextOutputFormat<Text, Text>
output.collect(key, values.next());
}
}
}
static class MultiFileOutput extends MultipleTextOutputFormat<Text, Text>
{
protected String generateFileNameForKeyValue(Text key, Text value,String name)
protected String generateFileNameForKeyValue(Text key, Text value,String name)
{
return key.toString();
}
}
public static void main(String[] args) throws Exception
return key.toString();
}
}
public static void main(String[] args) throws Exception
{
String InputFiles=args[0];
String OutputDir=args[1];
Configuration mycon=new Configuration();
JobConf conf = new JobConf(mycon,mult.class);
conf.setMapOutputKeyClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(MultiFileOutput.class);
FileInputFormat.setInputPaths(conf,InputFiles);
FileOutputFormat.setOutputPath(conf,new Path(OutputDir));
JobClient.runJob(conf);
String InputFiles=args[0];
String OutputDir=args[1];
Configuration mycon=new Configuration();
JobConf conf = new JobConf(mycon,mult.class);
conf.setMapOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(MultiFileOutput.class);
FileInputFormat.setInputPaths(conf,InputFiles);
FileOutputFormat.setOutputPath(conf,new Path(OutputDir));
JobClient.runJob(conf);
}
}
The output would be files Name and Age.
File Name contains data
Name Nish
Name Dash
File Age contains data
Age 27
Age 29
Class MultiFileOutput extends MultipleTextOutputFormat. What this means is that when the reducer is ready to spit out the Key/Value pair then before writing it to a file, it passes them to method generateFileNameForKeyValue. The logic to name the output file is the embedded in this method (in this case the logic is to create 1 file per key). The String returned by method generateFileNameForKeyValue determines the name of the file where this Key/Value pair is logged.
The output would be files Name and Age.
File Name contains data
Name Nish
Name Dash
File Age contains data
Age 27
Age 29
Class MultiFileOutput extends MultipleTextOutputFormat. What this means is that when the reducer is ready to spit out the Key/Value pair then before writing it to a file, it passes them to method generateFileNameForKeyValue. The logic to name the output file is the embedded in this method (in this case the logic is to create 1 file per key). The String returned by method generateFileNameForKeyValue determines the name of the file where this Key/Value pair is logged.
Speculative Task execution in Hadoop
One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program.
Tasks may be slow for various reasons, including hardware degradation, or software mis-configuration, but the causes may be hard to detect since the tasks still complete successfully, albeit after a longer time than expected. Hadoop doesn’t try to diagnose and fix slow-running tasks; instead, it tries to detect when a task is running slower than expected and launches another, equivalent, task as a backup. This is termed speculative execution of tasks.
For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.
By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.
Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the
mapred.map.tasks.speculative.execution
and
mapred.reduce.tasks.speculative.execution
JobConf options to false, respectively using old API,
while with newer API you may consider changing
mapreduce.map.speculative and mapreduce.reduce.speculative.JVM Reuse in MapReduce job
If you have very small tasks that are definitely running after each other, it is useful to set this property to -1 (meaning that a spawned JVM will be reused unlimited times). So you just spawn (number of task in your cluster available to your job)-JVMs instead of (number of tasks)-JVMs.
This is a huge performance improvement. In long running jobs the percentage of the runtime in comparision to setup a new JVM is very low, so it doesn't give you a huge performance boost.
Also in long running tasks it is good to recreate the task process, because of issues like heap fragmentation degrading your performance.
The value of this is 1 for good reason, it's much safer. You're more likely to have problems in the state of a persisting jvm instance affecting subsequent tasks in that instance when using jvm reuse.
MR2 doesn't support jvm reuse at all.
I would not change this setting from 1 unless you have a very strong reason to use it and know exactly what you are doing and have all perfect MR jobs on your cluster (not likely!).
Wednesday, June 4, 2014
Running Hadoop in Pseudo Distributed Mode
This section contains instructions for Hadoop installation on ubuntu. This is Hadoop quickstart tutorial to setup Hadoop quickly. This is shortest tutorial of Hadoop installation, here you will get all the commands and their description required to install Hadoop in Pseudo distributed mode(single node cluster)
| COMMAND | DESCRIPTION |
|---|---|
| sudo apt-get install sun-java6-jdk | Install java |
| If you don't have hadoop bundle download here download hadoop | |
| sudo tar xzf file_name.tar.gz | Extract hadoop bundle |
| Go to your hadoop installation directory(HADOOP_HOME) | |
| vi conf/hadoop-env.sh | Edit configuration file hadoop-env.sh and set JAVA_HOME: export JAVA_HOME=path to be the root of your Java installation(eg: /usr/lib/jvm/java-6-sun) |
| vi conf/core-site.xml then type: <configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration> | Edit configuration file core-site.xml |
| vi conf/hdfs-site.xml then type: <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration> | Edit configuration file hdfs-site.xml |
| vi conf/mapred.xml then type: <configuration> <property> <name>mapred.job.tracker</name> <value>localhost:9001</value> </property> </configuration> | Edit configuration file mapred-site.xml and type: |
| sudo apt-get install openssh-server openssh-client | install ssh |
| ssh-keygen -t rsa -P "" cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys ssh localhost | Setting passwordless ssh |
| bin/hadoop namenode –format | Format the new distributed-filesystem During this operation : Name node get start Name node get formatted Name node get stopped |
| bin/start-all.sh | Start the hadoop daemons |
| jps | It should give output like this: 14799 NameNode 14977 SecondaryNameNode 15183 DataNode 15596 JobTracker 15897 TaskTracker |
| Congratulations Hadoop Setup is Completed | |
| http://localhost:50070/ | web based interface for name node |
| http://localhost:50030/ | web based interface for job tracker |
| Now lets run some examples | |
| bin/hadoop jar hadoop-*-examples.jar pi 10 100 | run pi example |
| bin/hadoop dfs -mkdir input bin/hadoop dfs -put conf input bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+' bin/hadoop dfs -cat output/* | run grep example |
| bin/hadoop dfs -mkdir inputwords bin/hadoop dfs -put conf inputwords bin/hadoop jar hadoop-*-examples.jar wordcount inputwords outputwords bin/hadoop dfs -cat outputwords/* | run wordcount example |
| bin/stop-all.sh | Stop the hadoop daemons |
Sunday, June 1, 2014
SQOOP - Imports RDBMS data to HDFS and Export HDFS data to RDBMS
TO IMPORT & EXPORT DATA FROM RDBMS (MYSQL,ORACLE, etc) INTO HDFS / HIVE / HBASE
Pre-requisite
- Apache Hadoop
- Apache Sqoop (compatible with Hadoop version)
- Apache Hive (optional)
- Apache HBase (optional)
- Apache HCatalog (optional)
- JDBC/ODBC connector
For all RDBMS, Connection URL changes and remaining all command line arguments remains same. You need to download specific JDBC/ODBC connector JAR and copy it to $SQOOP_HOME/lib
MySQL
Download mysql-connector-java.jar and place in $SQOOP_HOME/lib folder
cp mysql-connector-java-5.1.18-bin.jar /usr/local/hadoop/sqoop-1.4.3-cdh4.4.0/lib/
Expecting you have data in MySQL tables.
Retrieving list of Databases available in MySQL from SQOOP
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root -P
MySQL to HDFS Import
Have Primary key:
sqoop import -connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --target-dir /path/to/directoryName
No Primary key:
sqoop import -connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --target-dir /path/to/directoryName -m 1
MySQL to Hive Import:
Have Primary key:
sqoop-import --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --hive-table tableName --create-hive-table --hive-import --hive-home path/to/hive_home
No Primary key:
sqoop-import --connect jdbc:mysql://localhost:3306/db1 -username root -password password --table tableName --hive-table TableName --create-hive-table --hive-import --hive-home path/to/hive_home -m 1
MySQL to HBase Import
Have Import All columns:
sqoop import --connect jdbc:mysql://localhost:3306/db1 --username root --password root --table tableName --hbase-table hbase_tableName --column-family hbase_table_col1 --hbase-create-table
HBase import few columns:
sqoop import --connect jdbc:mysql://localhost:3306/db1 --username root --password root --table tableName --hbase-table hbase_tableName --columns column1,column2 --column-family hbase_table_col1 --hbase-create-table
To HBase with Primary key:
sqoop import --connect jdbc:mysql://localhost:3306/db1 --username root --password root --table tableName --hbase-table hbase_tableName --column-family hbase_table_col1 --hbase-row-key column1 –hbase-create-table
To Hbase with no primary key:
sqoop import --connect jdbc:mysql://localhost:3306/db1 --username root --password root --table tableName --hbase-table hbase_tableName --columns column1,column2 --column-family hbase_table_col --hbase-row-key column1 --hbase-create-table
Export from HDFS to MySQL:
Same for all Hive/HBase/HDFS: Because Hive tables are nothing but directories in HDFS. So you're just exporting a directory to MySQL:
sqoop export --connect jdbc:mysql://localhost:3306/test_db --table tableName --export-dir /user/hive/warehouse/tableName --username root --password password -m 1 --input-fields-terminated-by '\001'
SQL Server:
Connection URL:
sqoop import --connect 'jdbc:sqlserver://<IP(or)hostname>;username=dbuser;password=dbpasswd;database=<DB>' --table <table> --target-dir /path/to/hdfs/dir --split-by <KEY> -m 1
Download Connector from Microsoft website:
Place it in $SQOOP_HOME/lib
Oracle:
Connection URL:
sqoop import --connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=myhost)(port=1521))(connect_data=(service_name=myservice)))"
--username USER --table SCHEMA.TABLE_NAME --hive-import --hive-table SCHEMA.TABLE_NAME \ --num-mappers 1 --verbose -P \
IBM DB2
Download the DB2Driver and place it in $SQOOP_HOME/lib
sqoop import --driver com.ibm.db2.jcc.DB2Driver --connect jdbc:db2://db2.my.com:50000/testdb --username db2user --db2pwd --table db2tbl --split-by tbl_primarykey --target-dir sqoopimports
sqoop export --driver com.ibm.db2.jcc.DB2Driver --connect jdbc:db2://db2.my.com:50000/myDB --username db2user --password db2pwd --table db2tbl --export-dir /sqoop/dataFile.csv
Different Connection Strings for Different RDBMS
Database version --direct support? connect string matches
HSQLDB 1.8.0+ No jdbc:hsqldb:*//
MySQL 5.0+ Yes jdbc:mysql://
Oracle 10.2.0+ No jdbc:oracle:*//
PostgreSQL 8.3+ Yes (import only) jdbc:postgresql://
Subscribe to:
Comments (Atom)
