Monday, 27 October 2014

Hadoop Chain Mapper - Example



Objective:  

              Using  Hadoop  chainmapper how to chain multiple Mapper before it reaches reducer.
Here first Mapper class output will be the input for second Mapper class input.

Pattern : Mapper1 -> Mapper2 -> Reducer

Here I am taken simple wordcount example.
1st Mapper  will read the file from input txt file and split each text and store it in Context.
2nd Mapper will get 1stMapper output and convert all the key text  into Lower case key.
Lower case key will be stored into Context.
Reducer will get the value from 2ndMapper , Same key related values will go to one reducer task.
In Reducer we are simply doing word count operation with the given key.

SplitMapper.java [ 1st Mapper class ]
package com.tamil;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SplitMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
              @Override
              protected void map(LongWritable key, Text value, Context context)
                                           throws IOException, InterruptedException {
                             StringTokenizer tokenizer = new StringTokenizer(value.toString());
                             IntWritable dummyValue = new IntWritable(1);
                             while (tokenizer.hasMoreElements()) {
                                           String content = (String) tokenizer.nextElement();
                                           context.write(new Text(content), dummyValue);
                             }
              }
}

LowerCaseMapper.java [ 2nd Mapper class ]
package com.tamil;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LowerCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> {

              @Override
              protected void map(Text key, IntWritable value, Context context)
                                           throws IOException, InterruptedException {
                             String val = key.toString().toLowerCase();
                             Text newKey = new Text(val);
                             context.write(newKey, value);
              }
}

ChainMapReducer.java [ Reducer class ]
package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ChainMapReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values,
              Context context) throws IOException, InterruptedException {
                             int sum = 0;
                             for (IntWritable value : values) {sum += value.get();}
                             context.write(key, new IntWritable(sum));
              }
}

ChainMapperDriver.java [ Driver class]
package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ChainMapperDriver {

              public static void main(String[] args) throws Exception {
                             Configuration config = new Configuration();
                             String[] otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
                             if (otherArgs.length != 2) {
                                           System.err.print("Useage: wordcount <in> <out>");
                                           System.exit(2);
                             }

                             Job job = Job.getInstance();
                             Configuration splitMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, SplitMapper.class, LongWritable.class,
                                                          Text.class, Text.class, IntWritable.class, splitMapConfig);
                             Configuration lowerCaseMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, LowerCaseMapper.class, Text.class,
                                                          IntWritable.class, Text.class, IntWritable.class,
                                                          lowerCaseMapConfig);
                             job.setJarByClass(ChainMapperDriver.class);
                             job.setCombinerClass(ChainMapReducer.class);
                             job.setReducerClass(ChainMapReducer.class);
                             job.setOutputKeyClass(Text.class);
                             job.setOutputValueClass(IntWritable.class);
                             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                             FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                             System.exit(job.waitForCompletion(true) ? 0 : 1);
              }
}

That's it.
Create a jar file & run it :-)

11 comments:

  1. Nice example of Mapreduce Chaining with latest API. Do you know how to specify custom partitioner, grouping while chaining? For instance how do you implement secondary sort while chaining?

    ReplyDelete
  2. Thanks for the example. Does it matter which version of Hadoop I'm using? with version one I got runtime error. Is there any specific reason to use Hadoop YARN to be able to use org.apache.hadoop.mapreduce.lib.chain.ChainMapper;

    ReplyDelete
    Replies
    1. @Fatemah, YARN is a part of 2nd gen hadoop, so is diff from 1st gen hadoop.
      First gen hadoop's package (org.apache.hadoop.mapred.*) is refactored to org.apache.hadoop.mapreduce.* package.
      Code is written in Hadoop 2.0+

      Delete
  3. Just adding, the possible pattern is (Mapper+ Reducer Mapper*), so you can have more mappers at the end.

    ReplyDelete
  4. What ever may be the platform, I have become genious working with hadoop on that platform after attending hadoop online training with expertize training on the subject. Thanks also for sharing great posts on this blog. Keep up the good work.

    ReplyDelete
  5. Example is nice. Please change your background, its very disturbing, unable to read the code properly.

    ReplyDelete
  6. Get full Real time mapreduce course at
    Hadoop MAPREDUCE in Depth | A Real-Time course on Mapreduce

    https://www.udemy.com/hadoop-mapreduce-in-depth-a-real-time-course-on-mapreduce/?couponCode=LEAST10

    ReplyDelete
  7. Hi your post on hadoop Map Reducer was good and I understood the concept very much thanks for such a useful post Hadoop Training in Velachery | Hadoop Training .

    ReplyDelete
  8. Great Article… I love to read your articles because your writing style is too good, its is very very helpful for all of us and I never get bored while reading your article because, they are becomes a more and more interesting from the starting lines until the end.
    Salesforce Training in Chennai

    Salesforce Online Training in Chennai

    Salesforce Training in Bangalore

    Salesforce Training in Hyderabad

    Salesforce training in ameerpet

    Salesforce Training in Pune

    Salesforce Online Training

    Salesforce Training

    ReplyDelete
  9. Very Nice Blog…Thanks for sharing this information with us. Here am sharing some information about training institute.
    tektutes tableautraining

    ReplyDelete
  10. I liked your content, I would like to contribute and share a few details about the salesforce course in Hyderabad which offerssalesforce training with placement in Hyderabad . Learn salesforce and boost up your skills with the updated systems and techniques in the market and get the opportunity to find good jobs relating to salesforce

    ReplyDelete