Objective:
Using Hadoop chainmapper how to chain multiple Mapper
before it reaches reducer.
Here first Mapper class output will be the input for second
Mapper class input.
Pattern : Mapper1 -> Mapper2 -> Reducer
Here I am taken simple wordcount example.
1st Mapper will read
the file from input txt file and split each text and store it in Context.
2nd Mapper will get 1stMapper output and convert all the key
text into Lower case key.
Lower case key will be stored into Context.
Reducer will get the value from 2ndMapper , Same key related
values will go to one reducer task.
In Reducer we are simply doing word count operation with the
given key.
SplitMapper.java [
1st Mapper class ]
package com.tamil;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SplitMapper extends Mapper<LongWritable, Text, Text,
IntWritable> {
@Override
protected void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
StringTokenizer
tokenizer = new StringTokenizer(value.toString());
IntWritable
dummyValue = new IntWritable(1);
while
(tokenizer.hasMoreElements()) {
String
content = (String) tokenizer.nextElement();
context.write(new
Text(content), dummyValue);
}
}
}
|
LowerCaseMapper.java
[ 2nd Mapper class ]
package com.tamil;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LowerCaseMapper extends Mapper<Text,
IntWritable, Text, IntWritable> {
@Override
protected void
map(Text key, IntWritable value, Context context)
throws
IOException, InterruptedException {
String
val = key.toString().toLowerCase();
Text
newKey = new Text(val);
context.write(newKey,
value);
}
}
|
ChainMapReducer.java
[ Reducer class ]
package com.tamil;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ChainMapReducer extends Reducer<Text, IntWritable,
Text, IntWritable> {
@Override
protected void
reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int
sum = 0;
for
(IntWritable value : values) {sum += value.get();}
context.write(key,
new IntWritable(sum));
}
}
|
ChainMapperDriver.java
[ Driver class]
package com.tamil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ChainMapperDriver {
public static void
main(String[] args) throws Exception {
Configuration
config = new Configuration();
String[]
otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
if
(otherArgs.length != 2) {
System.err.print("Useage:
wordcount <in> <out>");
System.exit(2);
}
Job
job = Job.getInstance();
Configuration
splitMapConfig = new Configuration(false);
ChainMapper.addMapper(job,
SplitMapper.class, LongWritable.class,
Text.class,
Text.class, IntWritable.class, splitMapConfig);
Configuration
lowerCaseMapConfig = new Configuration(false);
ChainMapper.addMapper(job,
LowerCaseMapper.class, Text.class,
IntWritable.class,
Text.class, IntWritable.class,
lowerCaseMapConfig);
job.setJarByClass(ChainMapperDriver.class);
job.setCombinerClass(ChainMapReducer.class);
job.setReducerClass(ChainMapReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,
new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)
? 0 : 1);
}
}
|
That's it.
Create a jar file & run it :-)