Friday 14 November 2014

Hadoop Map Reduce with MongoDB Database

Objective : 

  •  Reading MongoDB data from Hadoop Mapreduce for data mining process.
  • Develop Mapreduce program in windows based system with Maven to prepare executable jar file.
     
                   Here in this example Hadoop will read all the rows from MongoDB and counting number of rows in collection.
It will also support text processing custom searched MongoDB documents also.
It will also support the searched result store into MongoDB another collection table.

Windows Environment


  •  Create a Maven project Named MongoHadoop.
  •  Add maven dependencies in pom.xml file.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tamil</groupId>
    <artifactId>MongoHadoop</artifactId>
    <version>0.1</version>
    <dependencies>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-hadoop-core</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-hadoop-streaming</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.7.0_05</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.tamil.MongoDBDriver</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

  •  Create a MongoDBMapper.java class under com.tamil package.


package com.tamil;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.bson.BSONObject;

public class MongoDBMapper extends Mapper<Object, BSONObject, Text, LongWritable> {
    @Override
    public void map(Object key, BSONObject value, Context context)
        throws IOException, InterruptedException {
        String twitte = (String) value.get("Text");
        Text text = new Text("Count");
        context.write(text, new LongWritable(1));
    }
}
  •   Create a MongoDBReducer.java class under com.tamil package.

package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class MongoDBReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    public void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable value : values) { sum += value.get();}
        context.write(key, new LongWritable(sum));
    }
}
  •   Create a MongoDBDriver.java class under com.tamil package.  


package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class MongoDBDriver {
    public static void main(String[] args) {
        try {
            final Configuration config = new Configuration();
            MongoConfigUtil.setInputURI(config,"mongodb://localhost:27017/MytDB.MyTable");
            String[] otherArgs =new GenericOptionsParser(config, args)             .getRemainingArgs();
            if (otherArgs.length != 1) {
                System.err.print("Useage: MongoDBDriver <out>");
                System.exit(2);
            }
            Job job = new Job(config, "MongoTitle");
            job.setJarByClass(MongoDBDriver.class);
            job.setMapperClass(MongoDBMapper.class);
            job.setCombinerClass(MongoDBReducer.class);
            job.setReducerClass(MongoDBReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            job.setInputFormatClass(MongoInputFormat.class);
            System.out.println("Dummy URl "+ otherArgs[1]);
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
        catch (Exception e) { e.printStackTrace();}
    }
}
  • To create Executable jar file with dependencies , run maven assembly command
>  mvn clean compile package assembly:assembly

 Linux Environment 

  • Copy the jar file into Linux system and run hadoop command.


$ jar MongoDBHadoop.jar com.tamil/MongoDBDriver hdfs://localhost.localdomain:8020/user/cloudera/output
  • Hadoop map reduce job will run and the results will be stored into hdfs://localhost.localdomain:8020/user/cloudera/output/part-r-00000 file.
  • Using hadoop cat command we can see the content of  part-r-00000 file

$ hadoop fs -cat  hdfs://localhost.localdomain:8020/user/cloudera/output/part-r-00000
Count    111793

So number of documents in the mongodb collection is 111793.
Now Its easy to develop Hadoop Map reduce program in Windows Environment itself using maven.
Great Job :-)

Thursday 13 November 2014

Google Spread Sheet - To Store Hadoop MapReduce Result

Objective :

                  Simple Service to store Hadoop MapReduce result into Google Spread Sheet.

 Purpose :

                 Dynamicaly Hadoop MapReduce result should reflect in Web Application chart which referes the Google spread sheet.
At Run time my Hadoop Mapreduce program result will be uploaded into Google spread sheet using this service.
My Blog's Current chart will use the Spread Sheet Data and display the result in my Blog without any changes in blog.

Flow:


Hadoop MapReduce Result -> Google Spread Sheet -> Google Chart -> Blog

Google Service:


                       Using Maven repository add google core jar file . I Used com.google.gdata.core.1.47.1.jar for this service class.

GSpreadSheetService.java

import java.io.IOException;
import java.net.*;
import java.util.*;

import com.google.gdata.client.spreadsheet.SpreadsheetService;
import com.google.gdata.data.Link;
import com.google.gdata.data.PlainTextConstruct;
import com.google.gdata.data.batch.*;
import com.google.gdata.data.spreadsheet.*;
import com.google.gdata.util.*;

public class GSpreadSheetService {
    private String user;
    private String password;
    private String application;
    private SpreadsheetService spreadsheetService;
    private SpreadsheetFeed spreadsheetFeed;
    private static final String SHEET_URL = "https://spreadsheets.google.com/feeds/spreadsheets/private/full";

    public GSpreadSheetService(String app, String us, String pwd,
            String proxyHost, String proxyPort) {
        this(app, us, pwd);
        System.setProperty("https.proxyHost", proxyHost);
        System.setProperty("https.proxyPort", proxyPort);

    }

    public GSpreadSheetService(String app, String us, String pwd) {
        this.application = app;
        this.user = us;
        this.password = pwd;
    }

    private void initiate() throws AuthenticationException,
            MalformedURLException, IOException, ServiceException {
        spreadsheetService = new SpreadsheetService(application);
        spreadsheetService.setProtocolVersion(SpreadsheetService.Versions.V3);
        spreadsheetService.setUserCredentials(user, password);
        URL feedURL = new URL(SHEET_URL);
        spreadsheetFeed = spreadsheetService.getFeed(feedURL,
                SpreadsheetFeed.class);
    }

    public List<String> getAllWorkSheetsNames() {
        List<String> names = new ArrayList<String>();
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            List<SpreadsheetEntry> spreadsheets = spreadsheetFeed.getEntries();
            for (SpreadsheetEntry spreadsheet : spreadsheets) {
                names.add(spreadsheet.getTitle().getPlainText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return names;
    }

    public boolean deleteWorkSheetInSpreadSheet(String spreadSheetName,
            String workSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            WorksheetEntry worksheet = getProperWorkSheet(spreadSheetName,
                    workSheetName);
            if (worksheet != null) {
                worksheet.delete();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean createWorkSheetWithDataInSpreadSheet(String spreadSheetName,
            String workSheetName, String[] headers, String[][] rows) {
        int rowCount = 5;
        int columnCount = 4;
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            System.out.println("Objec Initialized");
            SpreadsheetEntry spreadSheet = getProperSpreadsheet(spreadSheetName);
            rowCount = rows.length;
            columnCount = headers.length;
            WorksheetEntry worksheet = new WorksheetEntry();
            worksheet.setTitle(new PlainTextConstruct(workSheetName));
            worksheet.setColCount(columnCount);
            worksheet.setRowCount(rowCount);
            WorksheetEntry createdWorkSheet = getProperWorkSheet(
                    spreadSheetName, workSheetName);
            if (createdWorkSheet == null) {
                URL worksheetFeedUrl = spreadSheet.getWorksheetFeedUrl();
                createdWorkSheet = spreadsheetService.insert(worksheetFeedUrl,
                        worksheet);
                System.out.println("Work Sheet created");
            }
            if (createdWorkSheet != null) {
                WorksheetEntry searchedWorksheet = getProperWorkSheet(
                        spreadSheetName, workSheetName);

                URL cellFeedUrl = searchedWorksheet.getCellFeedUrl();
                CellFeed cellFeed = spreadsheetService.getFeed(cellFeedUrl,
                        CellFeed.class);

                List<CellAddress> cellAddrs = new ArrayList<CellAddress>();
                for (int col = 0; col < headers.length; ++col) {
                    cellAddrs.add(new CellAddress(1, (1 + col), headers[col]));
                }

                Map<String, CellEntry> cellEntries = getCellEntryMap(
                        spreadsheetService, cellFeedUrl, cellAddrs);
                System.out.println("Map constructed");

                CellFeed batchRequest = new CellFeed();
                for (CellAddress cellAddr : cellAddrs) {
                    CellEntry batchEntry = new CellEntry(
                            cellEntries.get(cellAddr.idString));
                    batchEntry.changeInputValueLocal(cellAddr.value);
                    batchEntry.setImmutable(true);
                    BatchUtils.setBatchId(batchEntry, cellAddr.idString);
                    BatchUtils.setBatchOperationType(batchEntry,
                            BatchOperationType.UPDATE);
                    batchRequest.getEntries().add(batchEntry);
                }

                // Submit the update
                Link batchLink = cellFeed.getLink(Link.Rel.FEED_BATCH,
                        Link.Type.ATOM);
                CellFeed batchResponse = spreadsheetService.batch(new URL(
                        batchLink.getHref()), batchRequest);
                System.out.println("batch Submitted");
                // Check the results
                boolean isSuccess = true;
                for (CellEntry entry : batchResponse.getEntries()) {
                    String batchId = BatchUtils.getBatchId(entry);
                    if (!BatchUtils.isSuccess(entry)) {
                        isSuccess = false;
                        BatchStatus status = BatchUtils.getBatchStatus(entry);
                        System.out.printf("%s failed (%s) %s", batchId,
                                status.getReason(), status.getContent());
                    }
                }

                System.out.println("Header Cell Insertion Completed");
                URL listFeedUrl = searchedWorksheet.getListFeedUrl();
                ListFeed listFeed = spreadsheetService.getFeed(listFeedUrl,
                        ListFeed.class);
                for (int i = 0; i < rows.length; i++) {
                    ListEntry row = new ListEntry();
                    for (int j = 0; j < rows[i].length; j++) {
                        row.getCustomElements().setValueLocal(headers[j],
                                rows[i][j]);
                    }
                    row = spreadsheetService.insert(listFeedUrl, row);
                    System.out.println("Row Inserted");
                }
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public static Map<String, CellEntry> getCellEntryMap(
            SpreadsheetService ssSvc, URL cellFeedUrl,
            List<CellAddress> cellAddrs) throws IOException, ServiceException {
        CellFeed batchRequest = new CellFeed();
        for (CellAddress cellId : cellAddrs) {
            CellEntry batchEntry = new CellEntry(cellId.row, cellId.col,
                    cellId.idString);
            batchEntry.setId(String.format("%s/%s", cellFeedUrl.toString(),
                    cellId.idString));
            BatchUtils.setBatchId(batchEntry, cellId.idString);
            BatchUtils.setBatchOperationType(batchEntry,
                    BatchOperationType.QUERY);
            batchRequest.getEntries().add(batchEntry);
        }

        CellFeed cellFeed = ssSvc.getFeed(cellFeedUrl, CellFeed.class);
        CellFeed queryBatchResponse = ssSvc.batch(
                new URL(cellFeed.getLink(Link.Rel.FEED_BATCH, Link.Type.ATOM)
                        .getHref()), batchRequest);

        Map<String, CellEntry> cellEntryMap = new HashMap<String, CellEntry>(
                cellAddrs.size());
        for (CellEntry entry : queryBatchResponse.getEntries()) {
            cellEntryMap.put(BatchUtils.getBatchId(entry), entry);
            // System.out.printf( "batch %s {CellEntry: id=%s editLink=%s inputValue=%s\n",
            // BatchUtils.getBatchId(entry), entry.getId(), entry.getEditLink().getHref(), entry.getCell()
            // .getInputValue());
        }

        return cellEntryMap;
    }

    private WorksheetEntry getProperWorkSheet(String spreadSheetName,
            String workSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            SpreadsheetEntry spreadSheet = getProperSpreadsheet(spreadSheetName);
            WorksheetFeed worksheetFeed = spreadsheetService.getFeed(
                    spreadSheet.getWorksheetFeedUrl(), WorksheetFeed.class);
            List<WorksheetEntry> worksheets = worksheetFeed.getEntries();
            for (WorksheetEntry workSheet : worksheets) {
                if (workSheetName.trim().equalsIgnoreCase(
                        workSheet.getTitle().getPlainText())) {
                    return workSheet;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static class CellAddress {
        public final int row;
        public final int col;
        public final String idString;
        public final String value;

        public CellAddress(int row, int col, String v) {
            this.row = row;
            this.col = col;
            this.idString = String.format("R%sC%s", row, col);
            this.value = v;
        }
    }

    private SpreadsheetEntry getProperSpreadsheet(String spreadSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            List<SpreadsheetEntry> spreadsheets = spreadsheetFeed.getEntries();
            for (SpreadsheetEntry spreadSheet : spreadsheets) {
                if (spreadSheetName.trim().equalsIgnoreCase(
                        spreadSheet.getTitle().getPlainText())) {
                    return spreadSheet;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}


Google Chart [Using GoogleSpreadSheet]







Monday 27 October 2014

Hadoop Chain Mapper - Example



Objective:  

              Using  Hadoop  chainmapper how to chain multiple Mapper before it reaches reducer.
Here first Mapper class output will be the input for second Mapper class input.

Pattern : Mapper1 -> Mapper2 -> Reducer

Here I am taken simple wordcount example.
1st Mapper  will read the file from input txt file and split each text and store it in Context.
2nd Mapper will get 1stMapper output and convert all the key text  into Lower case key.
Lower case key will be stored into Context.
Reducer will get the value from 2ndMapper , Same key related values will go to one reducer task.
In Reducer we are simply doing word count operation with the given key.

SplitMapper.java [ 1st Mapper class ]
package com.tamil;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SplitMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
              @Override
              protected void map(LongWritable key, Text value, Context context)
                                           throws IOException, InterruptedException {
                             StringTokenizer tokenizer = new StringTokenizer(value.toString());
                             IntWritable dummyValue = new IntWritable(1);
                             while (tokenizer.hasMoreElements()) {
                                           String content = (String) tokenizer.nextElement();
                                           context.write(new Text(content), dummyValue);
                             }
              }
}

LowerCaseMapper.java [ 2nd Mapper class ]
package com.tamil;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LowerCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> {

              @Override
              protected void map(Text key, IntWritable value, Context context)
                                           throws IOException, InterruptedException {
                             String val = key.toString().toLowerCase();
                             Text newKey = new Text(val);
                             context.write(newKey, value);
              }
}

ChainMapReducer.java [ Reducer class ]
package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ChainMapReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values,
              Context context) throws IOException, InterruptedException {
                             int sum = 0;
                             for (IntWritable value : values) {sum += value.get();}
                             context.write(key, new IntWritable(sum));
              }
}

ChainMapperDriver.java [ Driver class]
package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ChainMapperDriver {

              public static void main(String[] args) throws Exception {
                             Configuration config = new Configuration();
                             String[] otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
                             if (otherArgs.length != 2) {
                                           System.err.print("Useage: wordcount <in> <out>");
                                           System.exit(2);
                             }

                             Job job = Job.getInstance();
                             Configuration splitMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, SplitMapper.class, LongWritable.class,
                                                          Text.class, Text.class, IntWritable.class, splitMapConfig);
                             Configuration lowerCaseMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, LowerCaseMapper.class, Text.class,
                                                          IntWritable.class, Text.class, IntWritable.class,
                                                          lowerCaseMapConfig);
                             job.setJarByClass(ChainMapperDriver.class);
                             job.setCombinerClass(ChainMapReducer.class);
                             job.setReducerClass(ChainMapReducer.class);
                             job.setOutputKeyClass(Text.class);
                             job.setOutputValueClass(IntWritable.class);
                             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                             FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                             System.exit(job.waitForCompletion(true) ? 0 : 1);
              }
}

That's it.
Create a jar file & run it :-)