Objective:
              Using  Hadoop  chainmapper how to chain multiple Mapper
before it reaches reducer.
Here first Mapper class output will be the input for second
Mapper class input.
Pattern : Mapper1 -> Mapper2 -> Reducer
Here I am taken simple wordcount example.
1st Mapper  will read
the file from input txt file and split each text and store it in Context.
2nd Mapper will get 1stMapper output and convert all the key
text  into Lower case key. 
Lower case key will be stored into Context.
Reducer will get the value from 2ndMapper , Same key related
values will go to one reducer task.
In Reducer we are simply doing word count operation with the
given key.
SplitMapper.java [
1st Mapper class ]
| 
package com.tamil; 
import java.io.IOException; 
import java.util.StringTokenizer; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
public class SplitMapper extends Mapper<LongWritable, Text, Text,
  IntWritable> { 
              @Override 
              protected void
  map(LongWritable key, Text value, Context context) 
                                           throws
  IOException, InterruptedException { 
                             StringTokenizer
  tokenizer = new StringTokenizer(value.toString()); 
                             IntWritable
  dummyValue = new IntWritable(1); 
                             while
  (tokenizer.hasMoreElements()) { 
                                           String
  content = (String) tokenizer.nextElement(); 
                                           context.write(new
  Text(content), dummyValue); 
                             } 
              } 
} | 
LowerCaseMapper.java
[ 2nd Mapper class ]
| 
package com.tamil; 
import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
public class LowerCaseMapper extends Mapper<Text,
  IntWritable, Text, IntWritable> { 
              @Override 
              protected void
  map(Text key, IntWritable value, Context context) 
                                           throws
  IOException, InterruptedException { 
                             String
  val = key.toString().toLowerCase(); 
                             Text
  newKey = new Text(val); 
                             context.write(newKey,
  value); 
              } 
} | 
ChainMapReducer.java
[ Reducer class ]
| 
package com.tamil; 
import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
public class ChainMapReducer extends Reducer<Text, IntWritable,
  Text, IntWritable> { 
              @Override 
              protected void
  reduce(Text key, Iterable<IntWritable> values, 
              Context context)
  throws IOException, InterruptedException { 
                             int
  sum = 0; 
                             for
  (IntWritable value : values) {sum += value.get();} 
                             context.write(key,
  new IntWritable(sum)); 
              } 
} | 
ChainMapperDriver.java
[ Driver class]
| 
package com.tamil; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 
public class ChainMapperDriver { 
              public static void
  main(String[] args) throws Exception { 
                             Configuration
  config = new Configuration(); 
                             String[]
  otherArgs = new GenericOptionsParser(config, args).getRemainingArgs(); 
                             if
  (otherArgs.length != 2) { 
                                           System.err.print("Useage:
  wordcount <in> <out>"); 
                                           System.exit(2); 
                             } 
                             Job
  job = Job.getInstance(); 
                             Configuration
  splitMapConfig = new Configuration(false); 
                             ChainMapper.addMapper(job,
  SplitMapper.class, LongWritable.class, 
                                                          Text.class,
  Text.class, IntWritable.class, splitMapConfig); 
                             Configuration
  lowerCaseMapConfig = new Configuration(false); 
                             ChainMapper.addMapper(job,
  LowerCaseMapper.class, Text.class, 
                                                          IntWritable.class,
  Text.class, IntWritable.class, 
                                                          lowerCaseMapConfig); 
                             job.setJarByClass(ChainMapperDriver.class); 
                             job.setCombinerClass(ChainMapReducer.class); 
                             job.setReducerClass(ChainMapReducer.class); 
                             job.setOutputKeyClass(Text.class); 
                             job.setOutputValueClass(IntWritable.class); 
                             FileInputFormat.addInputPath(job,
  new Path(otherArgs[0])); 
                             FileOutputFormat.setOutputPath(job,
  new Path(otherArgs[1])); 
                             System.exit(job.waitForCompletion(true)
  ? 0 : 1); 
              } 
} | 
That's it.
Create a jar file & run it :-)
