Friday 14 November 2014

Hadoop Map Reduce with MongoDB Database

Objective : 

  •  Reading MongoDB data from Hadoop Mapreduce for data mining process.
  • Develop Mapreduce program in windows based system with Maven to prepare executable jar file.
     
                   Here in this example Hadoop will read all the rows from MongoDB and counting number of rows in collection.
It will also support text processing custom searched MongoDB documents also.
It will also support the searched result store into MongoDB another collection table.

Windows Environment


  •  Create a Maven project Named MongoHadoop.
  •  Add maven dependencies in pom.xml file.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tamil</groupId>
    <artifactId>MongoHadoop</artifactId>
    <version>0.1</version>
    <dependencies>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-hadoop-core</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-hadoop-streaming</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.7.0_05</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.tamil.MongoDBDriver</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

  •  Create a MongoDBMapper.java class under com.tamil package.


package com.tamil;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.bson.BSONObject;

public class MongoDBMapper extends Mapper<Object, BSONObject, Text, LongWritable> {
    @Override
    public void map(Object key, BSONObject value, Context context)
        throws IOException, InterruptedException {
        String twitte = (String) value.get("Text");
        Text text = new Text("Count");
        context.write(text, new LongWritable(1));
    }
}
  •   Create a MongoDBReducer.java class under com.tamil package.

package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class MongoDBReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    public void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable value : values) { sum += value.get();}
        context.write(key, new LongWritable(sum));
    }
}
  •   Create a MongoDBDriver.java class under com.tamil package.  


package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class MongoDBDriver {
    public static void main(String[] args) {
        try {
            final Configuration config = new Configuration();
            MongoConfigUtil.setInputURI(config,"mongodb://localhost:27017/MytDB.MyTable");
            String[] otherArgs =new GenericOptionsParser(config, args)             .getRemainingArgs();
            if (otherArgs.length != 1) {
                System.err.print("Useage: MongoDBDriver <out>");
                System.exit(2);
            }
            Job job = new Job(config, "MongoTitle");
            job.setJarByClass(MongoDBDriver.class);
            job.setMapperClass(MongoDBMapper.class);
            job.setCombinerClass(MongoDBReducer.class);
            job.setReducerClass(MongoDBReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            job.setInputFormatClass(MongoInputFormat.class);
            System.out.println("Dummy URl "+ otherArgs[1]);
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
        catch (Exception e) { e.printStackTrace();}
    }
}
  • To create Executable jar file with dependencies , run maven assembly command
>  mvn clean compile package assembly:assembly

 Linux Environment 

  • Copy the jar file into Linux system and run hadoop command.


$ jar MongoDBHadoop.jar com.tamil/MongoDBDriver hdfs://localhost.localdomain:8020/user/cloudera/output
  • Hadoop map reduce job will run and the results will be stored into hdfs://localhost.localdomain:8020/user/cloudera/output/part-r-00000 file.
  • Using hadoop cat command we can see the content of  part-r-00000 file

$ hadoop fs -cat  hdfs://localhost.localdomain:8020/user/cloudera/output/part-r-00000
Count    111793

So number of documents in the mongodb collection is 111793.
Now Its easy to develop Hadoop Map reduce program in Windows Environment itself using maven.
Great Job :-)

Thursday 13 November 2014

Google Spread Sheet - To Store Hadoop MapReduce Result

Objective :

                  Simple Service to store Hadoop MapReduce result into Google Spread Sheet.

 Purpose :

                 Dynamicaly Hadoop MapReduce result should reflect in Web Application chart which referes the Google spread sheet.
At Run time my Hadoop Mapreduce program result will be uploaded into Google spread sheet using this service.
My Blog's Current chart will use the Spread Sheet Data and display the result in my Blog without any changes in blog.

Flow:


Hadoop MapReduce Result -> Google Spread Sheet -> Google Chart -> Blog

Google Service:


                       Using Maven repository add google core jar file . I Used com.google.gdata.core.1.47.1.jar for this service class.

GSpreadSheetService.java

import java.io.IOException;
import java.net.*;
import java.util.*;

import com.google.gdata.client.spreadsheet.SpreadsheetService;
import com.google.gdata.data.Link;
import com.google.gdata.data.PlainTextConstruct;
import com.google.gdata.data.batch.*;
import com.google.gdata.data.spreadsheet.*;
import com.google.gdata.util.*;

public class GSpreadSheetService {
    private String user;
    private String password;
    private String application;
    private SpreadsheetService spreadsheetService;
    private SpreadsheetFeed spreadsheetFeed;
    private static final String SHEET_URL = "https://spreadsheets.google.com/feeds/spreadsheets/private/full";

    public GSpreadSheetService(String app, String us, String pwd,
            String proxyHost, String proxyPort) {
        this(app, us, pwd);
        System.setProperty("https.proxyHost", proxyHost);
        System.setProperty("https.proxyPort", proxyPort);

    }

    public GSpreadSheetService(String app, String us, String pwd) {
        this.application = app;
        this.user = us;
        this.password = pwd;
    }

    private void initiate() throws AuthenticationException,
            MalformedURLException, IOException, ServiceException {
        spreadsheetService = new SpreadsheetService(application);
        spreadsheetService.setProtocolVersion(SpreadsheetService.Versions.V3);
        spreadsheetService.setUserCredentials(user, password);
        URL feedURL = new URL(SHEET_URL);
        spreadsheetFeed = spreadsheetService.getFeed(feedURL,
                SpreadsheetFeed.class);
    }

    public List<String> getAllWorkSheetsNames() {
        List<String> names = new ArrayList<String>();
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            List<SpreadsheetEntry> spreadsheets = spreadsheetFeed.getEntries();
            for (SpreadsheetEntry spreadsheet : spreadsheets) {
                names.add(spreadsheet.getTitle().getPlainText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return names;
    }

    public boolean deleteWorkSheetInSpreadSheet(String spreadSheetName,
            String workSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            WorksheetEntry worksheet = getProperWorkSheet(spreadSheetName,
                    workSheetName);
            if (worksheet != null) {
                worksheet.delete();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean createWorkSheetWithDataInSpreadSheet(String spreadSheetName,
            String workSheetName, String[] headers, String[][] rows) {
        int rowCount = 5;
        int columnCount = 4;
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            System.out.println("Objec Initialized");
            SpreadsheetEntry spreadSheet = getProperSpreadsheet(spreadSheetName);
            rowCount = rows.length;
            columnCount = headers.length;
            WorksheetEntry worksheet = new WorksheetEntry();
            worksheet.setTitle(new PlainTextConstruct(workSheetName));
            worksheet.setColCount(columnCount);
            worksheet.setRowCount(rowCount);
            WorksheetEntry createdWorkSheet = getProperWorkSheet(
                    spreadSheetName, workSheetName);
            if (createdWorkSheet == null) {
                URL worksheetFeedUrl = spreadSheet.getWorksheetFeedUrl();
                createdWorkSheet = spreadsheetService.insert(worksheetFeedUrl,
                        worksheet);
                System.out.println("Work Sheet created");
            }
            if (createdWorkSheet != null) {
                WorksheetEntry searchedWorksheet = getProperWorkSheet(
                        spreadSheetName, workSheetName);

                URL cellFeedUrl = searchedWorksheet.getCellFeedUrl();
                CellFeed cellFeed = spreadsheetService.getFeed(cellFeedUrl,
                        CellFeed.class);

                List<CellAddress> cellAddrs = new ArrayList<CellAddress>();
                for (int col = 0; col < headers.length; ++col) {
                    cellAddrs.add(new CellAddress(1, (1 + col), headers[col]));
                }

                Map<String, CellEntry> cellEntries = getCellEntryMap(
                        spreadsheetService, cellFeedUrl, cellAddrs);
                System.out.println("Map constructed");

                CellFeed batchRequest = new CellFeed();
                for (CellAddress cellAddr : cellAddrs) {
                    CellEntry batchEntry = new CellEntry(
                            cellEntries.get(cellAddr.idString));
                    batchEntry.changeInputValueLocal(cellAddr.value);
                    batchEntry.setImmutable(true);
                    BatchUtils.setBatchId(batchEntry, cellAddr.idString);
                    BatchUtils.setBatchOperationType(batchEntry,
                            BatchOperationType.UPDATE);
                    batchRequest.getEntries().add(batchEntry);
                }

                // Submit the update
                Link batchLink = cellFeed.getLink(Link.Rel.FEED_BATCH,
                        Link.Type.ATOM);
                CellFeed batchResponse = spreadsheetService.batch(new URL(
                        batchLink.getHref()), batchRequest);
                System.out.println("batch Submitted");
                // Check the results
                boolean isSuccess = true;
                for (CellEntry entry : batchResponse.getEntries()) {
                    String batchId = BatchUtils.getBatchId(entry);
                    if (!BatchUtils.isSuccess(entry)) {
                        isSuccess = false;
                        BatchStatus status = BatchUtils.getBatchStatus(entry);
                        System.out.printf("%s failed (%s) %s", batchId,
                                status.getReason(), status.getContent());
                    }
                }

                System.out.println("Header Cell Insertion Completed");
                URL listFeedUrl = searchedWorksheet.getListFeedUrl();
                ListFeed listFeed = spreadsheetService.getFeed(listFeedUrl,
                        ListFeed.class);
                for (int i = 0; i < rows.length; i++) {
                    ListEntry row = new ListEntry();
                    for (int j = 0; j < rows[i].length; j++) {
                        row.getCustomElements().setValueLocal(headers[j],
                                rows[i][j]);
                    }
                    row = spreadsheetService.insert(listFeedUrl, row);
                    System.out.println("Row Inserted");
                }
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public static Map<String, CellEntry> getCellEntryMap(
            SpreadsheetService ssSvc, URL cellFeedUrl,
            List<CellAddress> cellAddrs) throws IOException, ServiceException {
        CellFeed batchRequest = new CellFeed();
        for (CellAddress cellId : cellAddrs) {
            CellEntry batchEntry = new CellEntry(cellId.row, cellId.col,
                    cellId.idString);
            batchEntry.setId(String.format("%s/%s", cellFeedUrl.toString(),
                    cellId.idString));
            BatchUtils.setBatchId(batchEntry, cellId.idString);
            BatchUtils.setBatchOperationType(batchEntry,
                    BatchOperationType.QUERY);
            batchRequest.getEntries().add(batchEntry);
        }

        CellFeed cellFeed = ssSvc.getFeed(cellFeedUrl, CellFeed.class);
        CellFeed queryBatchResponse = ssSvc.batch(
                new URL(cellFeed.getLink(Link.Rel.FEED_BATCH, Link.Type.ATOM)
                        .getHref()), batchRequest);

        Map<String, CellEntry> cellEntryMap = new HashMap<String, CellEntry>(
                cellAddrs.size());
        for (CellEntry entry : queryBatchResponse.getEntries()) {
            cellEntryMap.put(BatchUtils.getBatchId(entry), entry);
            // System.out.printf( "batch %s {CellEntry: id=%s editLink=%s inputValue=%s\n",
            // BatchUtils.getBatchId(entry), entry.getId(), entry.getEditLink().getHref(), entry.getCell()
            // .getInputValue());
        }

        return cellEntryMap;
    }

    private WorksheetEntry getProperWorkSheet(String spreadSheetName,
            String workSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            SpreadsheetEntry spreadSheet = getProperSpreadsheet(spreadSheetName);
            WorksheetFeed worksheetFeed = spreadsheetService.getFeed(
                    spreadSheet.getWorksheetFeedUrl(), WorksheetFeed.class);
            List<WorksheetEntry> worksheets = worksheetFeed.getEntries();
            for (WorksheetEntry workSheet : worksheets) {
                if (workSheetName.trim().equalsIgnoreCase(
                        workSheet.getTitle().getPlainText())) {
                    return workSheet;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static class CellAddress {
        public final int row;
        public final int col;
        public final String idString;
        public final String value;

        public CellAddress(int row, int col, String v) {
            this.row = row;
            this.col = col;
            this.idString = String.format("R%sC%s", row, col);
            this.value = v;
        }
    }

    private SpreadsheetEntry getProperSpreadsheet(String spreadSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            List<SpreadsheetEntry> spreadsheets = spreadsheetFeed.getEntries();
            for (SpreadsheetEntry spreadSheet : spreadsheets) {
                if (spreadSheetName.trim().equalsIgnoreCase(
                        spreadSheet.getTitle().getPlainText())) {
                    return spreadSheet;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}


Google Chart [Using GoogleSpreadSheet]