Data Sample:
All above data is tabular format \t
| BoroughCode+ | StatusOrderNumber | +SignSequence+ | Distance in Miles+ | ArrowPoints | +SignDescription | |||||
| B | P-004958 | 1 | 0 | Curb Line | ||||||
| B | P-004958 | 2 | 9 | Property Line | ||||||
| B | P-004958 | 3 | 30 | NIGHT REGULATION | ||||||
| B | P-004958 | 4 | 30 | 1 HOUR PARKING 9AM-7PM | ||||||
Each line consists above column header data.
Entire data is in more than 8Lakh record of capacity.
Mapreduce Case:
Client wants to Fetch unique file with file name of StatusOrderNumber
And the file should contain Maximum Distance and respective Description in it.
My mapreduce Program goes like this:
MultipleOutputMapper.Java:-
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class MultipleOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*BoroughCode StatusOrderNumber SignSequence Distance ArrowPoints SignDescription
B P-004958 1 0 Curb Line */
if(!value.toString().contains("BoroughCode"))
{
String line = value.toString();
String[] tokens = line.split("\t");
context.write(new Text(tokens[1]), new Text(tokens[3] + "," + tokens[5]));
}
}
}
MultipleOutputReducer.Java:-
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.lang.Integer;
public class MultipleOutputReducer extends Reducer<Text, Text, Text, Text> {
MultipleOutputs<Text, Text> mos;
public void setup(Context context) {
mos = new MultipleOutputs<Text, Text>(context);
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*Key -> P-004958
Value ->1,Curb Line*/
int maxValue = Integer.MIN_VALUE;
Text kValue = new Text();
Text kDescr = new Text();
for (Text value : values) {
String str = value.toString();
String[] items = str.split(",");
if(maxValue < Integer.parseInt(items[0]))
{
kDescr = new Text(items[1].toString());
}
maxValue = Math.max(maxValue, Integer.parseInt(items[0]));
}
kValue = new Text(String.valueOf(maxValue));
mos.write(kValue, kDescr, key.toString());
}
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
MultipleOutputDriver.Java:-
package org.bkfs.multipleoutput;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultipleOutputDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MultipleOutput <input path> <output path>");
System.exit(-1);
}
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
Job job = Job.getInstance();
job.setJarByClass(MultipleOutputDriver.class);
job.setJobName("MultipleOutput Job");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(MultipleOutputMapper.class);
job.setReducerClass(MultipleOutputReducer.class);
FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
MultipleOutputs.addNamedOutput(job, "MaxDistanceStatusOrderNumber", TextOutputFormat.class, NullWritable.class, Text.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new MultipleOutputDriver(), args);
System.exit(res);
}
}
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultipleOutputDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MultipleOutput <input path> <output path>");
System.exit(-1);
}
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
Job job = Job.getInstance();
job.setJarByClass(MultipleOutputDriver.class);
job.setJobName("MultipleOutput Job");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(MultipleOutputMapper.class);
job.setReducerClass(MultipleOutputReducer.class);
FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);
MultipleOutputs.addNamedOutput(job, "MaxDistanceStatusOrderNumber", TextOutputFormat.class, NullWritable.class, Text.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new MultipleOutputDriver(), args);
System.exit(res);
}
}
No comments:
Post a Comment