Wednesday, January 28, 2015

Multiple OutputFile Mapreduce Program

Data Sample:
BoroughCode+ StatusOrderNumber +SignSequence+ Distance in Miles+ArrowPoints +SignDescription
B P-004958 1 0     Curb Line
B P-004958 2 9     Property Line
B P-004958 3 30     NIGHT REGULATION
B P-004958 4 30     1 HOUR PARKING 9AM-7PM                                                                                                 





All above data is tabular format \t
Each line consists above column header data.
Entire data is in more than 8Lakh record of capacity.

Mapreduce Case:
Client wants to Fetch unique file with file name of StatusOrderNumber
And the file should contain Maximum Distance and respective Description in it.

My mapreduce Program goes like this:

MultipleOutputMapper.Java:-

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;


public class MultipleOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   
/*BoroughCode StatusOrderNumber SignSequence Distance ArrowPoints SignDescription
 B P-004958 1 0   Curb Line */
if(!value.toString().contains("BoroughCode"))
{
String line = value.toString();
       String[] tokens = line.split("\t");
       context.write(new Text(tokens[1]), new Text(tokens[3] + "," + tokens[5]));
   }
}
}

MultipleOutputReducer.Java:-

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.lang.Integer;

public class MultipleOutputReducer extends Reducer<Text, Text, Text, Text> {
MultipleOutputs<Text, Text> mos;

    public void setup(Context context) {
        mos = new MultipleOutputs<Text, Text>(context);
    }

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
   
    /*Key -> P-004958
    Value ->1,Curb Line*/
   
    int maxValue = Integer.MIN_VALUE;
    Text kValue = new Text();
    Text kDescr = new Text();
   
    for (Text value : values) {
       String str = value.toString();
            String[] items = str.split(",");
            if(maxValue < Integer.parseInt(items[0]))
            {
              kDescr = new Text(items[1].toString());
            }
            maxValue = Math.max(maxValue, Integer.parseInt(items[0]));
    }
   
    kValue = new Text(String.valueOf(maxValue));
    mos.write(kValue, kDescr, key.toString());
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }
}

MultipleOutputDriver.Java:-

package org.bkfs.multipleoutput;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputDriver extends Configured implements Tool {

public int run(String[] args) throws Exception {

  if (args.length != 2) {
            System.err.println("Usage: MultipleOutput <input path> <output path>");
            System.exit(-1);
  }

  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);

  Job job = Job.getInstance();
  job.setJarByClass(MultipleOutputDriver.class);
  job.setJobName("MultipleOutput Job");

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  job.setMapperClass(MultipleOutputMapper.class);
  job.setReducerClass(MultipleOutputReducer.class);

  FileInputFormat.setInputPaths(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);

  MultipleOutputs.addNamedOutput(job, "MaxDistanceStatusOrderNumber", TextOutputFormat.class, NullWritable.class, Text.class);

  job.waitForCompletion(true);

return 0;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new MultipleOutputDriver(), args);
System.exit(res);
}
}

Wednesday, January 14, 2015

Running Shell scripting from Crontab in Linux


>crontab null
>crontab -l

>vi masterscript
* * * * * pathto.sh pathto.cfg >dev/null >2$1

ESC -> readonly -> :q (Exit)
ESC -> readonly -> :wq! ( save and exit )
ESC -> readonly -> :q! (Force exit)
[ESC: Escape from Insert mode]


>crontab -l
   masterscript

>crontab masterscript


You need to press [Esc] key followed by the colon (:) before typing the following commands:
eg, [ESC]  :wq!


Tuesday, January 13, 2015

Apache Maven POM.xml for Hadoop Mapreduce programs

DOWNLOAD Maven software folder and place it in Program Files.
Set M2_HOME and M2 variables at System level.

Run below commands from command prompt to download dependencies to .m2/repositary folder 


mvn org.apache.maven.plugins:maven-dependency-plugin:2.4:get \
    -DremoteRepositories=http://download.java.net/maven/2 \
    -Dartifact=robo-guice:robo-guice:0.4-SNAPSHOT \
    -Ddest=c:\temp\robo-guice.jar

Below is my MaximumTempSample application POM.xml file. Click on Project properties -> Maven -> Update Project... to get updates of dependencies.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       <modelVersion>4.0.0</modelVersion>
       <groupId>MaximumTempSample</groupId>
       <artifactId>org.mycompany.MaxRecord</artifactId>
       <version>0.0.1-SNAPSHOT</version>
       <packaging>jar</packaging>

       <name>MaximumTempSample</name>
       <url>http://maven.apache.org</url>

       <properties>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <hadoop.version>2.5.0-cdh5.2.0</hadoop.version>
              <log4j.version>1.2.17</log4j.version>
              <maven_jar_plugin.version>2.5</maven_jar_plugin.version>
       </properties>
      
       <dependencies>     
       <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
       <version>2.1</version>
       </dependency>
       <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-core</artifactId>
       <version>2.1</version>
       </dependency>
       <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       </dependency>
       </dependencies>
       
      <build>
              <plugins>
                     <plugin>
                           <groupId>org.apache.maven.plugins</groupId>
                           <artifactId>maven-jar-plugin</artifactId>
                           <version>${maven_jar_plugin.version}</version>
                     </plugin>
              </plugins>
       </build>
      
      <repositories>
              <repository>
                     <id>cloudera-repo</id>
                     <url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
              </repository>
       </repositories>

 </project>

Monday, January 12, 2015

Maximum Temperature Mapreduce Program

DriverClass.Java:-
----------------------
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverClass {
                public static void main(String[] args) throws Exception {
                                if (args.length != 2) {
                                                System.err.println("Usage: MaxTemperature <input path> <output path>");
                                                System.exit(-1);
                                }

                                Job job = new Job();
                                job.setJarByClass(DriverClass.class);
                                job.setJobName("Max temperature");
                                FileInputFormat.addInputPath(job, new Path(args[0]));
                                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                                job.setMapperClass(DMapper.class);
                                job.setCombinerClass(DReducer.class);
                                job.setReducerClass(DReducer.class);
                                job.setOutputKeyClass(Text.class);
                                job.setOutputValueClass(IntWritable.class);

                                System.exit(job.waitForCompletion(true) ? 0 : 1);
                }
 } 


DMapper.Java:-
--------------------
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
                private static final int MISSING = 9999;
       
                @Override
                public void map(LongWritable key, Text value, Context context)
                                                throws IOException, InterruptedException
                {
                                if(value.toString().length()>130)
                                {
                                                String line = value.toString();
                                               
                                                String year = line.substring(15, 19);
                                                int airTemperature;
                               
                                                if (line.charAt(87) == '+')
                                                { 
// parseInt doesn't like leading plus
//                                                                                                                                                            // //signs
                                                                airTemperature = Integer.parseInt(line.substring(88, 92));
                                                }
                                                else
                                                {
                                                                airTemperature = Integer.parseInt(line.substring(87, 92));
                                                }
                                                String quality = line.substring(92, 93);
                                                if (airTemperature != MISSING && quality.matches("[01459]"))
                                                {
                                                                context.write(new Text(year), new IntWritable(airTemperature));
                                                }
                                }
                }
} 

DReducer.Java:-
--------------------
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}

}


Sample Data:
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
0029029070999991901010113004+64333+023450FM-12+000599999V0202901N008219999999N0000001N9-00721+99999102001ADDGF104991999999999999999999
0029029070999991901010120004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00941+99999102001ADDGF108991999999999999999999
0029029070999991901010206004+64333+023450FM-12+000599999V0201801N008219999999N0000001N9-00611+99999101831ADDGF108991999999999999999999
0029029070999991901010213004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00561+99999101761ADDGF108991999999999999999999
0029029070999991901010220004+64333+023450FM-12+000599999V0201801N009819999999N0000001N9-00281+99999101751ADDGF108991999999999999999999
0029029070999991901010306004+64333+023450FM-12+000599999V0202001N009819999999N0000001N9-00671+99999101701ADDGF106991999999999999999999
0029029070999991901010313004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00331+99999101741ADDGF108991999999999999999999
0029029070999991901010320004+64333+023450FM-12+000599999V0202301N011819999999N0000001N9-00281+99999101741ADDGF108991999999999999999999
0029029070999991901010406004+64333+023450FM-12+000599999V0209991C000019999999N0000001N9-00331+99999102311ADDGF108991999999999999999999
0029029070999991901010413004+64333+023450FM-12+000599999V0202301N008219999999N0000001N9-00441+99999102261ADDGF108991999999999999999999
0029029070999991901010420004+64333+023450FM-12+000599999V0202001N011819999999N0000001N9-00391+99999102231ADDGF108991999999999999999999
0029029070999991901010506004+64333+023450FM-12+000599999V0202701N004119999999N0000001N9+00001+99999101821ADDGF104991999999999999999999
0029029070999991901010513004+64333+023450FM-12+000599999V0202701N002119999999N0000001N9+00061+99999102591ADDGF104991999999999999999999
0029029070999991901010520004+64333+023450FM-12+000599999V0202301N004119999999N0000001N9+00001+99999102671ADDGF104991999999999999999999




Word Count Mapreduce program

WordCountMapper.Java :-
------------------------------
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
                private static final IntWritable one = new IntWritable(1);
                @Override
                public void map(LongWritable key, Text value, Context context)
                                                throws IOException, InterruptedException
                {
                                StringTokenizer paragraph = new StringTokenizer(value.toString());
                                Text word = new Text();
                                while(paragraph.hasMoreTokens())
                                {
                                                word.set(paragraph.nextToken().toString());    
                                                context.write(word, one);
                                }
                } 

}

WordCountReducer.Java:-
----------------------------------
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
                                Reducer<Text, IntWritable, Text, IntWritable> 
{

                IntWritable maxresult = new IntWritable();
                @Override
                public void reduce(Text key, Iterable<IntWritable> values, Context context)
                                                throws IOException, InterruptedException {

                                int minValue = Integer.MIN_VALUE;
                                for (IntWritable value : values) {
                                                minValue += value.get();
                                }
                                maxresult.set(minValue);
                                context.write(key, maxresult);
                }
} 

WordCountDriver.Java:-
------------------------------
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
                 /**
                 * @param args
                 */
                public static void main(String[] args) throws Exception {
                                if (args.length != 2) {
                                                System.err.println("Usage: WordCount <input path> <output path>");
                                                System.exit(-1);
                                }

                                Job job = new Job();
                                job.setJarByClass(WordCountDriver.class);
                                job.setJobName("Word Count");
                                FileInputFormat.addInputPath(job, new Path(args[0]));
                                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                                job.setMapperClass(WordCountMapper.class);
                                job.setCombinerClass(WordCountReducer.class);
                                job.setReducerClass(WordCountReducer.class);
                                job.setOutputKeyClass(Text.class);
                                job.setOutputValueClass(IntWritable.class);

                                System.exit(job.waitForCompletion(true) ? 0 : 1);
                }
 }