Wednesday, January 28, 2015

Multiple OutputFile Mapreduce Program

Data Sample:
BoroughCode+ StatusOrderNumber +SignSequence+ Distance in Miles+ArrowPoints +SignDescription
B P-004958 1 0     Curb Line
B P-004958 2 9     Property Line
B P-004958 3 30     NIGHT REGULATION
B P-004958 4 30     1 HOUR PARKING 9AM-7PM                                                                                                 





All above data is tabular format \t
Each line consists above column header data.
Entire data is in more than 8Lakh record of capacity.

Mapreduce Case:
Client wants to Fetch unique file with file name of StatusOrderNumber
And the file should contain Maximum Distance and respective Description in it.

My mapreduce Program goes like this:

MultipleOutputMapper.Java:-

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;


public class MultipleOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   
/*BoroughCode StatusOrderNumber SignSequence Distance ArrowPoints SignDescription
 B P-004958 1 0   Curb Line */
if(!value.toString().contains("BoroughCode"))
{
String line = value.toString();
       String[] tokens = line.split("\t");
       context.write(new Text(tokens[1]), new Text(tokens[3] + "," + tokens[5]));
   }
}
}

MultipleOutputReducer.Java:-

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.lang.Integer;

public class MultipleOutputReducer extends Reducer<Text, Text, Text, Text> {
MultipleOutputs<Text, Text> mos;

    public void setup(Context context) {
        mos = new MultipleOutputs<Text, Text>(context);
    }

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
   
    /*Key -> P-004958
    Value ->1,Curb Line*/
   
    int maxValue = Integer.MIN_VALUE;
    Text kValue = new Text();
    Text kDescr = new Text();
   
    for (Text value : values) {
       String str = value.toString();
            String[] items = str.split(",");
            if(maxValue < Integer.parseInt(items[0]))
            {
              kDescr = new Text(items[1].toString());
            }
            maxValue = Math.max(maxValue, Integer.parseInt(items[0]));
    }
   
    kValue = new Text(String.valueOf(maxValue));
    mos.write(kValue, kDescr, key.toString());
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }
}

MultipleOutputDriver.Java:-

package org.bkfs.multipleoutput;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputDriver extends Configured implements Tool {

public int run(String[] args) throws Exception {

  if (args.length != 2) {
            System.err.println("Usage: MultipleOutput <input path> <output path>");
            System.exit(-1);
  }

  Path inputDir = new Path(args[0]);
  Path outputDir = new Path(args[1]);

  Job job = Job.getInstance();
  job.setJarByClass(MultipleOutputDriver.class);
  job.setJobName("MultipleOutput Job");

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  job.setMapperClass(MultipleOutputMapper.class);
  job.setReducerClass(MultipleOutputReducer.class);

  FileInputFormat.setInputPaths(job, inputDir);
  FileOutputFormat.setOutputPath(job, outputDir);

  MultipleOutputs.addNamedOutput(job, "MaxDistanceStatusOrderNumber", TextOutputFormat.class, NullWritable.class, Text.class);

  job.waitForCompletion(true);

return 0;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new MultipleOutputDriver(), args);
System.exit(res);
}
}

No comments:

Post a Comment