Showing posts with label Hadoop MapReduce. Show all posts
Showing posts with label Hadoop MapReduce. Show all posts

Friday 14 November 2014

Hadoop Map Reduce with MongoDB Database

Objective : 

  •  Reading MongoDB data from Hadoop Mapreduce for data mining process.
  • Develop Mapreduce program in windows based system with Maven to prepare executable jar file.
     
                   Here in this example Hadoop will read all the rows from MongoDB and counting number of rows in collection.
It will also support text processing custom searched MongoDB documents also.
It will also support the searched result store into MongoDB another collection table.

Windows Environment


  •  Create a Maven project Named MongoHadoop.
  •  Add maven dependencies in pom.xml file.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tamil</groupId>
    <artifactId>MongoHadoop</artifactId>
    <version>0.1</version>
    <dependencies>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-hadoop-core</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-hadoop-streaming</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.7.0_05</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.tamil.MongoDBDriver</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

  •  Create a MongoDBMapper.java class under com.tamil package.


package com.tamil;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.bson.BSONObject;

public class MongoDBMapper extends Mapper<Object, BSONObject, Text, LongWritable> {
    @Override
    public void map(Object key, BSONObject value, Context context)
        throws IOException, InterruptedException {
        String twitte = (String) value.get("Text");
        Text text = new Text("Count");
        context.write(text, new LongWritable(1));
    }
}
  •   Create a MongoDBReducer.java class under com.tamil package.

package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class MongoDBReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    public void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable value : values) { sum += value.get();}
        context.write(key, new LongWritable(sum));
    }
}
  •   Create a MongoDBDriver.java class under com.tamil package.  


package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class MongoDBDriver {
    public static void main(String[] args) {
        try {
            final Configuration config = new Configuration();
            MongoConfigUtil.setInputURI(config,"mongodb://localhost:27017/MytDB.MyTable");
            String[] otherArgs =new GenericOptionsParser(config, args)             .getRemainingArgs();
            if (otherArgs.length != 1) {
                System.err.print("Useage: MongoDBDriver <out>");
                System.exit(2);
            }
            Job job = new Job(config, "MongoTitle");
            job.setJarByClass(MongoDBDriver.class);
            job.setMapperClass(MongoDBMapper.class);
            job.setCombinerClass(MongoDBReducer.class);
            job.setReducerClass(MongoDBReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            job.setInputFormatClass(MongoInputFormat.class);
            System.out.println("Dummy URl "+ otherArgs[1]);
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
        catch (Exception e) { e.printStackTrace();}
    }
}
  • To create Executable jar file with dependencies , run maven assembly command
>  mvn clean compile package assembly:assembly

 Linux Environment 

  • Copy the jar file into Linux system and run hadoop command.


$ jar MongoDBHadoop.jar com.tamil/MongoDBDriver hdfs://localhost.localdomain:8020/user/cloudera/output
  • Hadoop map reduce job will run and the results will be stored into hdfs://localhost.localdomain:8020/user/cloudera/output/part-r-00000 file.
  • Using hadoop cat command we can see the content of  part-r-00000 file

$ hadoop fs -cat  hdfs://localhost.localdomain:8020/user/cloudera/output/part-r-00000
Count    111793

So number of documents in the mongodb collection is 111793.
Now Its easy to develop Hadoop Map reduce program in Windows Environment itself using maven.
Great Job :-)

Monday 27 October 2014

Hadoop Chain Mapper - Example



Objective:  

              Using  Hadoop  chainmapper how to chain multiple Mapper before it reaches reducer.
Here first Mapper class output will be the input for second Mapper class input.

Pattern : Mapper1 -> Mapper2 -> Reducer

Here I am taken simple wordcount example.
1st Mapper  will read the file from input txt file and split each text and store it in Context.
2nd Mapper will get 1stMapper output and convert all the key text  into Lower case key.
Lower case key will be stored into Context.
Reducer will get the value from 2ndMapper , Same key related values will go to one reducer task.
In Reducer we are simply doing word count operation with the given key.

SplitMapper.java [ 1st Mapper class ]
package com.tamil;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SplitMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
              @Override
              protected void map(LongWritable key, Text value, Context context)
                                           throws IOException, InterruptedException {
                             StringTokenizer tokenizer = new StringTokenizer(value.toString());
                             IntWritable dummyValue = new IntWritable(1);
                             while (tokenizer.hasMoreElements()) {
                                           String content = (String) tokenizer.nextElement();
                                           context.write(new Text(content), dummyValue);
                             }
              }
}

LowerCaseMapper.java [ 2nd Mapper class ]
package com.tamil;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LowerCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> {

              @Override
              protected void map(Text key, IntWritable value, Context context)
                                           throws IOException, InterruptedException {
                             String val = key.toString().toLowerCase();
                             Text newKey = new Text(val);
                             context.write(newKey, value);
              }
}

ChainMapReducer.java [ Reducer class ]
package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ChainMapReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values,
              Context context) throws IOException, InterruptedException {
                             int sum = 0;
                             for (IntWritable value : values) {sum += value.get();}
                             context.write(key, new IntWritable(sum));
              }
}

ChainMapperDriver.java [ Driver class]
package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ChainMapperDriver {

              public static void main(String[] args) throws Exception {
                             Configuration config = new Configuration();
                             String[] otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
                             if (otherArgs.length != 2) {
                                           System.err.print("Useage: wordcount <in> <out>");
                                           System.exit(2);
                             }

                             Job job = Job.getInstance();
                             Configuration splitMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, SplitMapper.class, LongWritable.class,
                                                          Text.class, Text.class, IntWritable.class, splitMapConfig);
                             Configuration lowerCaseMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, LowerCaseMapper.class, Text.class,
                                                          IntWritable.class, Text.class, IntWritable.class,
                                                          lowerCaseMapConfig);
                             job.setJarByClass(ChainMapperDriver.class);
                             job.setCombinerClass(ChainMapReducer.class);
                             job.setReducerClass(ChainMapReducer.class);
                             job.setOutputKeyClass(Text.class);
                             job.setOutputValueClass(IntWritable.class);
                             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                             FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                             System.exit(job.waitForCompletion(true) ? 0 : 1);
              }
}

That's it.
Create a jar file & run it :-)

Monday 1 September 2014

My First Hadoop Program with MRUnit


My first Hadoop program with MRUnit.
May be it is useful to the new hadoop developer starting from level Zero.
Here I am going to describe my first Hadoop program with MRUnit Test.

Prerequisites:
1. Oracle Virtual Box.
2.  Download & Install Cloudera VM in Oracle Virtual Box [ Here I used on hadoop 3.2.0 specific vm]
3. Download and place Jar Files for MRUnit Test in lib folder.
commons-logging-1.1.1.jar , commons-logging-1.1.1-sources.jar , hamcrest-core-1.1.jar ,junit-4.10.jar , log4j-1.2.15.jar , mockito-all-1.8.5.jar , mrunit-0.9.0-incubating-hadoop2.jar

Steps:
In Eclipse create new a Java project named "FirstHadoopProgram" with package name com.tamil.
Add libraries using project>>properties>>Java Build path >> Libraries >> Add External JARs...
Select all jar files present in /opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop/client folder.
Add all MRUnit Jar files from lib folder.


Source Files : 

Java Files
Copy all the files into project & Run the WordCountMapperTest .java file.

Now we need to create a jar file to run this program in Hadoop cluster.

1. Create a folder in test in desktop.
2. Place all the java files ( except JUnit test file ) in to the test folder.
3. Create a folder "wordcount" inside test folder.
4. compile the java code :

javac -cp `hadoop classpath` -d wordcount/ WordCountMapper.java WordCountReducer.java WordCountDriver.java

All the files will be compiled and class file will be placed into wordcount folder.

5. Create jar file

jar -cvf wordcount.jar -C wordcount/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/tamil/(in = 0) (out= 0)(stored 0%)
adding: com/tamil/WordCountReducer.class(in = 1612) (out= 677)(deflated 58%)
adding: com/tamil/WordCountMapper.class(in = 1886) (out= 861)(deflated 54%)
adding: com/tamil/WordCountDriver.class(in = 1744) (out= 942)(deflated 45%)


6. Create and  Move your Text file(MyText.txt) into HDFS to test our MapReduce program.

hadoop fs -mkdir  /user/cloudera/myInput
hadoop fs -put MyText.txt /user/cloudera/myInput/
hadoop fs -ls  /user/cloudera/myInput/
Found 1 items
-rw-r--r--   1 cloudera cloudera        141 2014-09-01 03:23 /user/cloudera/myInput/MyText.txt

7. Run Hadoop Jar file in HDFS:

$ hadoop jar wordcount.jar com.tamil/WordCountDriver /user/cloudera/myInput/ /user/cloudera/myOutput
14/09/01 03:29:18 INFO client.RMProxy: Connecting to ResourceManager at localhost.localdomain/127.0.0.1:8032
14/09/01 03:29:18 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/09/01 03:29:18 INFO input.FileInputFormat: Total input paths to process : 1
14/09/01 03:29:18 INFO mapreduce.JobSubmitter: number of splits:1
14/09/01 03:29:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1409554960178_0001
14/09/01 03:29:19 INFO impl.YarnClientImpl: Submitted application application_1409554960178_0001
14/09/01 03:29:19 INFO mapreduce.Job: The url to track the job: http://localhost.localdomain:8088/proxy/application_1409554960178_0001/
14/09/01 03:29:19 INFO mapreduce.Job: Running job: job_1409554960178_0001
14/09/01 03:29:25 INFO mapreduce.Job: Job job_1409554960178_0001 running in uber mode : false
14/09/01 03:29:25 INFO mapreduce.Job:  map 0% reduce 0%
14/09/01 03:29:30 INFO mapreduce.Job:  map 100% reduce 0%
14/09/01 03:29:35 INFO mapreduce.Job:  map 100% reduce 100%
14/09/01 03:29:35 INFO mapreduce.Job: Job job_1409554960178_0001 completed successfully
14/09/01 03:29:36 INFO mapreduce.Job: Counters: 49
               File System Counters
                              FILE: Number of bytes read=67
                              FILE: Number of bytes written=183335
                              FILE: Number of read operations=0
                              FILE: Number of large read operations=0
                              FILE: Number of write operations=0
                              HDFS: Number of bytes read=272
                              HDFS: Number of bytes written=33
                              HDFS: Number of read operations=6
                              HDFS: Number of large read operations=0
                              HDFS: Number of write operations=2
               Job Counters
                              Launched map tasks=1
                              Launched reduce tasks=1
                              Data-local map tasks=1
                              Total time spent by all maps in occupied slots (ms)=699648
                              Total time spent by all reduces in occupied slots (ms)=757760
                              Total time spent by all map tasks (ms)=2733
                              Total time spent by all reduce tasks (ms)=2960
                              Total vcore-seconds taken by all map tasks=2733
                              Total vcore-seconds taken by all reduce tasks=2960
                              Total megabyte-seconds taken by all map tasks=699648
                              Total megabyte-seconds taken by all reduce tasks=757760
               Map-Reduce Framework
                              Map input records=3
                              Map output records=7
                              Map output bytes=52
                              Map output materialized bytes=63
                              Input split bytes=131
                              Combine input records=7
                              Combine output records=6
                              Reduce input groups=6
                              Reduce shuffle bytes=63
                              Reduce input records=6
                              Reduce output records=6
                              Spilled Records=12
                              Shuffled Maps =1
                              Failed Shuffles=0
                              Merged Map outputs=1
                              GC time elapsed (ms)=50
                              CPU time spent (ms)=1110
                              Physical memory (bytes) snapshot=459587584
                              Virtual memory (bytes) snapshot=1806524416
                              Total committed heap usage (bytes)=355467264
               Shuffle Errors
                              BAD_ID=0
                              CONNECTION=0
                              IO_ERROR=0
                              WRONG_LENGTH=0
                              WRONG_MAP=0
                              WRONG_REDUCE=0
               File Input Format Counters
                              Bytes Read=141
               File Output Format Counters
                              Bytes Written=33

8. Check the result in HDFS newly created folder by Mapreduce program

$ hadoop fs -ls /user/cloudera/myOutput
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2014-09-01 03:29 /user/cloudera/myOutput/_SUCCESS
-rw-r--r--   1 cloudera cloudera         33 2014-09-01 03:29 /user/cloudera/myOutput/part-r-00000

9. Print the result in console

$ hadoop fs -cat /user/cloudera/myOutput/part-r-00000
I              1
It            1
am         1
is            2
it             1
useful    1

Wow... Great work... Its working... :-)