Thursday, 13 November 2014

Google Spread Sheet - To Store Hadoop MapReduce Result

Objective :

                  Simple Service to store Hadoop MapReduce result into Google Spread Sheet.

 Purpose :

                 Dynamicaly Hadoop MapReduce result should reflect in Web Application chart which referes the Google spread sheet.
At Run time my Hadoop Mapreduce program result will be uploaded into Google spread sheet using this service.
My Blog's Current chart will use the Spread Sheet Data and display the result in my Blog without any changes in blog.

Flow:


Hadoop MapReduce Result -> Google Spread Sheet -> Google Chart -> Blog

Google Service:


                       Using Maven repository add google core jar file . I Used com.google.gdata.core.1.47.1.jar for this service class.

GSpreadSheetService.java

import java.io.IOException;
import java.net.*;
import java.util.*;

import com.google.gdata.client.spreadsheet.SpreadsheetService;
import com.google.gdata.data.Link;
import com.google.gdata.data.PlainTextConstruct;
import com.google.gdata.data.batch.*;
import com.google.gdata.data.spreadsheet.*;
import com.google.gdata.util.*;

public class GSpreadSheetService {
    private String user;
    private String password;
    private String application;
    private SpreadsheetService spreadsheetService;
    private SpreadsheetFeed spreadsheetFeed;
    private static final String SHEET_URL = "https://spreadsheets.google.com/feeds/spreadsheets/private/full";

    public GSpreadSheetService(String app, String us, String pwd,
            String proxyHost, String proxyPort) {
        this(app, us, pwd);
        System.setProperty("https.proxyHost", proxyHost);
        System.setProperty("https.proxyPort", proxyPort);

    }

    public GSpreadSheetService(String app, String us, String pwd) {
        this.application = app;
        this.user = us;
        this.password = pwd;
    }

    private void initiate() throws AuthenticationException,
            MalformedURLException, IOException, ServiceException {
        spreadsheetService = new SpreadsheetService(application);
        spreadsheetService.setProtocolVersion(SpreadsheetService.Versions.V3);
        spreadsheetService.setUserCredentials(user, password);
        URL feedURL = new URL(SHEET_URL);
        spreadsheetFeed = spreadsheetService.getFeed(feedURL,
                SpreadsheetFeed.class);
    }

    public List<String> getAllWorkSheetsNames() {
        List<String> names = new ArrayList<String>();
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            List<SpreadsheetEntry> spreadsheets = spreadsheetFeed.getEntries();
            for (SpreadsheetEntry spreadsheet : spreadsheets) {
                names.add(spreadsheet.getTitle().getPlainText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return names;
    }

    public boolean deleteWorkSheetInSpreadSheet(String spreadSheetName,
            String workSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            WorksheetEntry worksheet = getProperWorkSheet(spreadSheetName,
                    workSheetName);
            if (worksheet != null) {
                worksheet.delete();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean createWorkSheetWithDataInSpreadSheet(String spreadSheetName,
            String workSheetName, String[] headers, String[][] rows) {
        int rowCount = 5;
        int columnCount = 4;
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            System.out.println("Objec Initialized");
            SpreadsheetEntry spreadSheet = getProperSpreadsheet(spreadSheetName);
            rowCount = rows.length;
            columnCount = headers.length;
            WorksheetEntry worksheet = new WorksheetEntry();
            worksheet.setTitle(new PlainTextConstruct(workSheetName));
            worksheet.setColCount(columnCount);
            worksheet.setRowCount(rowCount);
            WorksheetEntry createdWorkSheet = getProperWorkSheet(
                    spreadSheetName, workSheetName);
            if (createdWorkSheet == null) {
                URL worksheetFeedUrl = spreadSheet.getWorksheetFeedUrl();
                createdWorkSheet = spreadsheetService.insert(worksheetFeedUrl,
                        worksheet);
                System.out.println("Work Sheet created");
            }
            if (createdWorkSheet != null) {
                WorksheetEntry searchedWorksheet = getProperWorkSheet(
                        spreadSheetName, workSheetName);

                URL cellFeedUrl = searchedWorksheet.getCellFeedUrl();
                CellFeed cellFeed = spreadsheetService.getFeed(cellFeedUrl,
                        CellFeed.class);

                List<CellAddress> cellAddrs = new ArrayList<CellAddress>();
                for (int col = 0; col < headers.length; ++col) {
                    cellAddrs.add(new CellAddress(1, (1 + col), headers[col]));
                }

                Map<String, CellEntry> cellEntries = getCellEntryMap(
                        spreadsheetService, cellFeedUrl, cellAddrs);
                System.out.println("Map constructed");

                CellFeed batchRequest = new CellFeed();
                for (CellAddress cellAddr : cellAddrs) {
                    CellEntry batchEntry = new CellEntry(
                            cellEntries.get(cellAddr.idString));
                    batchEntry.changeInputValueLocal(cellAddr.value);
                    batchEntry.setImmutable(true);
                    BatchUtils.setBatchId(batchEntry, cellAddr.idString);
                    BatchUtils.setBatchOperationType(batchEntry,
                            BatchOperationType.UPDATE);
                    batchRequest.getEntries().add(batchEntry);
                }

                // Submit the update
                Link batchLink = cellFeed.getLink(Link.Rel.FEED_BATCH,
                        Link.Type.ATOM);
                CellFeed batchResponse = spreadsheetService.batch(new URL(
                        batchLink.getHref()), batchRequest);
                System.out.println("batch Submitted");
                // Check the results
                boolean isSuccess = true;
                for (CellEntry entry : batchResponse.getEntries()) {
                    String batchId = BatchUtils.getBatchId(entry);
                    if (!BatchUtils.isSuccess(entry)) {
                        isSuccess = false;
                        BatchStatus status = BatchUtils.getBatchStatus(entry);
                        System.out.printf("%s failed (%s) %s", batchId,
                                status.getReason(), status.getContent());
                    }
                }

                System.out.println("Header Cell Insertion Completed");
                URL listFeedUrl = searchedWorksheet.getListFeedUrl();
                ListFeed listFeed = spreadsheetService.getFeed(listFeedUrl,
                        ListFeed.class);
                for (int i = 0; i < rows.length; i++) {
                    ListEntry row = new ListEntry();
                    for (int j = 0; j < rows[i].length; j++) {
                        row.getCustomElements().setValueLocal(headers[j],
                                rows[i][j]);
                    }
                    row = spreadsheetService.insert(listFeedUrl, row);
                    System.out.println("Row Inserted");
                }
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public static Map<String, CellEntry> getCellEntryMap(
            SpreadsheetService ssSvc, URL cellFeedUrl,
            List<CellAddress> cellAddrs) throws IOException, ServiceException {
        CellFeed batchRequest = new CellFeed();
        for (CellAddress cellId : cellAddrs) {
            CellEntry batchEntry = new CellEntry(cellId.row, cellId.col,
                    cellId.idString);
            batchEntry.setId(String.format("%s/%s", cellFeedUrl.toString(),
                    cellId.idString));
            BatchUtils.setBatchId(batchEntry, cellId.idString);
            BatchUtils.setBatchOperationType(batchEntry,
                    BatchOperationType.QUERY);
            batchRequest.getEntries().add(batchEntry);
        }

        CellFeed cellFeed = ssSvc.getFeed(cellFeedUrl, CellFeed.class);
        CellFeed queryBatchResponse = ssSvc.batch(
                new URL(cellFeed.getLink(Link.Rel.FEED_BATCH, Link.Type.ATOM)
                        .getHref()), batchRequest);

        Map<String, CellEntry> cellEntryMap = new HashMap<String, CellEntry>(
                cellAddrs.size());
        for (CellEntry entry : queryBatchResponse.getEntries()) {
            cellEntryMap.put(BatchUtils.getBatchId(entry), entry);
            // System.out.printf( "batch %s {CellEntry: id=%s editLink=%s inputValue=%s\n",
            // BatchUtils.getBatchId(entry), entry.getId(), entry.getEditLink().getHref(), entry.getCell()
            // .getInputValue());
        }

        return cellEntryMap;
    }

    private WorksheetEntry getProperWorkSheet(String spreadSheetName,
            String workSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            SpreadsheetEntry spreadSheet = getProperSpreadsheet(spreadSheetName);
            WorksheetFeed worksheetFeed = spreadsheetService.getFeed(
                    spreadSheet.getWorksheetFeedUrl(), WorksheetFeed.class);
            List<WorksheetEntry> worksheets = worksheetFeed.getEntries();
            for (WorksheetEntry workSheet : worksheets) {
                if (workSheetName.trim().equalsIgnoreCase(
                        workSheet.getTitle().getPlainText())) {
                    return workSheet;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static class CellAddress {
        public final int row;
        public final int col;
        public final String idString;
        public final String value;

        public CellAddress(int row, int col, String v) {
            this.row = row;
            this.col = col;
            this.idString = String.format("R%sC%s", row, col);
            this.value = v;
        }
    }

    private SpreadsheetEntry getProperSpreadsheet(String spreadSheetName) {
        try {
            if (spreadsheetService == null || spreadsheetFeed == null)
                initiate();
            List<SpreadsheetEntry> spreadsheets = spreadsheetFeed.getEntries();
            for (SpreadsheetEntry spreadSheet : spreadsheets) {
                if (spreadSheetName.trim().equalsIgnoreCase(
                        spreadSheet.getTitle().getPlainText())) {
                    return spreadSheet;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}


Google Chart [Using GoogleSpreadSheet]







Monday, 27 October 2014

Hadoop Chain Mapper - Example



Objective:  

              Using  Hadoop  chainmapper how to chain multiple Mapper before it reaches reducer.
Here first Mapper class output will be the input for second Mapper class input.

Pattern : Mapper1 -> Mapper2 -> Reducer

Here I am taken simple wordcount example.
1st Mapper  will read the file from input txt file and split each text and store it in Context.
2nd Mapper will get 1stMapper output and convert all the key text  into Lower case key.
Lower case key will be stored into Context.
Reducer will get the value from 2ndMapper , Same key related values will go to one reducer task.
In Reducer we are simply doing word count operation with the given key.

SplitMapper.java [ 1st Mapper class ]
package com.tamil;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SplitMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
              @Override
              protected void map(LongWritable key, Text value, Context context)
                                           throws IOException, InterruptedException {
                             StringTokenizer tokenizer = new StringTokenizer(value.toString());
                             IntWritable dummyValue = new IntWritable(1);
                             while (tokenizer.hasMoreElements()) {
                                           String content = (String) tokenizer.nextElement();
                                           context.write(new Text(content), dummyValue);
                             }
              }
}

LowerCaseMapper.java [ 2nd Mapper class ]
package com.tamil;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LowerCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> {

              @Override
              protected void map(Text key, IntWritable value, Context context)
                                           throws IOException, InterruptedException {
                             String val = key.toString().toLowerCase();
                             Text newKey = new Text(val);
                             context.write(newKey, value);
              }
}

ChainMapReducer.java [ Reducer class ]
package com.tamil;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ChainMapReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values,
              Context context) throws IOException, InterruptedException {
                             int sum = 0;
                             for (IntWritable value : values) {sum += value.get();}
                             context.write(key, new IntWritable(sum));
              }
}

ChainMapperDriver.java [ Driver class]
package com.tamil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ChainMapperDriver {

              public static void main(String[] args) throws Exception {
                             Configuration config = new Configuration();
                             String[] otherArgs = new GenericOptionsParser(config, args).getRemainingArgs();
                             if (otherArgs.length != 2) {
                                           System.err.print("Useage: wordcount <in> <out>");
                                           System.exit(2);
                             }

                             Job job = Job.getInstance();
                             Configuration splitMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, SplitMapper.class, LongWritable.class,
                                                          Text.class, Text.class, IntWritable.class, splitMapConfig);
                             Configuration lowerCaseMapConfig = new Configuration(false);
                             ChainMapper.addMapper(job, LowerCaseMapper.class, Text.class,
                                                          IntWritable.class, Text.class, IntWritable.class,
                                                          lowerCaseMapConfig);
                             job.setJarByClass(ChainMapperDriver.class);
                             job.setCombinerClass(ChainMapReducer.class);
                             job.setReducerClass(ChainMapReducer.class);
                             job.setOutputKeyClass(Text.class);
                             job.setOutputValueClass(IntWritable.class);
                             FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
                             FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
                             System.exit(job.waitForCompletion(true) ? 0 : 1);
              }
}

That's it.
Create a jar file & run it :-)

Apache Flume for Twitter Data Mining


Objective:


Using Apache Flume collect the Twitter data for data analysis.

Prerequisite :

  • Must have Twitter Application account to collect the data.
  • Basic knowledge of Flume life cycle.
Flume cannot collect the data from twitter using its existing in build flume source files. So we need to create our own flume source to collect the data from twitter and bind with Flume Event.

Here we are going to see how to create out custom flume source creation.
Deploy the source jar file in Flume plugin.s. Flume Execution to store the data from twitter to HDFS file system

Twitter Source :

            This twitter source will serve the data collection source part in flume.
It will collect the data as a JSON Object format and binding it with Event.
Flume will collect this data from Source and store it into HDFS file system.
Here I used twitter stream version 2.2.6.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>com.tamil</groupId>
            <artifactId>TwitterSource</artifactId>
            <version>0.1</version>
            <dependencies>
                        <dependency>
                                    <groupId>org.twitter4j</groupId>
                                    <artifactId>twitter4j-stream</artifactId>
                                    <version>2.2.6</version>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-core</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-sdk</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>junit</groupId>
                                    <artifactId>junit</artifactId>
                                    <version>4.8.2</version>
                                    <scope>test</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.slf4j</groupId>
                                    <artifactId>slf4j-api</artifactId>
                                    <version>1.6.1</version>
                        </dependency>
                        <!-- <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
                                    <version>2.5.0</version> <scope>provided</scope> </dependency> -->
            </dependencies>

            <build>
                        <plugins>
                                    <plugin>
                                                <artifactId>maven-assembly-plugin</artifactId>
                                                <configuration>
                                                            <archive>
                                                                        <manifest>
                                                                                    <mainClass>com.tamil.TwitterSource</mainClass>
                                                                        </manifest>
                                                            </archive>
                                                            <descriptorRefs>
                                                                        <descriptorRef>jar-with-dependencies</descriptorRef>
                                                            </descriptorRefs>
                                                </configuration>
                                                <executions>
                                                            <execution>
                                                                        <id>make-assembly</id>
                                                                        <phase>package</phase>
                                                                        <goals>
                                                                                    <goal>single</goal>
                                                                        </goals>
                                                            </execution>
                                                </executions>
                                    </plugin>

                        </plugins>
            </build>
</project>


TwitterSource.java
package com.tamil;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;

public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger =LoggerFactory.getLogger(TwitterSource.class);
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessTokenSecret;
    private String[] keywords;
    private TwitterStream twitterStream;
    public void configure(Context context) {
        consumerKey = context.getString("consumerKey");
        consumerSecret = context.getString("consumerSecret");
        accessToken = context.getString("accessToken");
        accessTokenSecret = context.getString("accessTokenSecret");
        String keywordString = context.getString("keywords");
        if (keywordString.trim().length() == 0) {keywords = new String[0];}
        else {
            keywords = keywordString.split(",");
            for (int i = 0; i < keywords.length; i++) {
                keywords[i] = keywords[i].trim();
            }
        }

        ConfigurationBuilder conBuilder = new ConfigurationBuilder();
        conBuilder.setOAuthConsumerKey(consumerKey);
        conBuilder.setOAuthConsumerSecret(consumerSecret);
        conBuilder.setOAuthAccessToken(accessToken);
        conBuilder.setOAuthAccessTokenSecret(accessTokenSecret);
        conBuilder.setJSONStoreEnabled(true);
        conBuilder.setHttpProxyHost("192.168.51.80");
        conBuilder.setPrettyDebugEnabled(true);
        conBuilder.setHttpProxyPort(3128);
        conBuilder.setIncludeEntitiesEnabled(true);
        twitterStream = new TwitterStreamFactory(conBuilder.build()).getInstance();
    }

    @Override
    public void start() {
        final ChannelProcessor channel = getChannelProcessor();
        final Map<String, String> headers = new HashMap<String, String>();
        StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {
                logger.debug(status.getUser().getScreenName() + ": " + status.getText());
                headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
                Event event = EventBuilder.withBody(
                    DataObjectFactory.getRawJSON(status).getBytes(), headers);
                channel.processEvent(event);
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onScrubGeo(long userId, long upToStatusId) {}
            public void onException(Exception ex) {}
        };
        logger.debug("Setting up Twitter sample stream using consumer key {} and" +
            " access token {}", new String[] { consumerKey, accessToken });
        twitterStream.addListener(listener);
        if (keywords.length == 0) {
            logger.debug("Starting up Twitter sampling...");
            twitterStream.sample();
        }
        else {
            logger.debug("Starting up Twitter filtering...");
            FilterQuery query = new FilterQuery().track(keywords);
            twitterStream.filter(query);
        }
        super.start();
    }

    @Override
    public void stop() {
        logger.debug("Shutting down Twitter sample stream...");
        twitterStream.shutdown();
        super.stop();
    }
}


Using maven build the jar file with corresponding dependency files .
mvn clean compile install assembly:assembly

Deploying Custom Jar file in Flume Plugins.

           By Default Flume will be configured with some plugins.d path to get plugins from third party. Otherwise we can configure any folder with specific structure contain third party api as flume plugins folder using flume command line argument.

Basic structure of flume plugins folder
$/home/cloudera/Desktop/MyPlugin/plugins.d/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/lib/TamilSource.jar
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/libext/<Dependent files>


 Twitter.conf
TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=com.tamil.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=<consumerKey>
TwitterAgent.sources.Twitter.consumerSecret=< consumerSecret>
TwitterAgent.sources.Twitter.accessToken=< accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret=< accessTokenSecret>
TwitterAgent.sources.Twitter.keywords=Internet Of Things, InternetOfThings,
internetofthings, IOT, iot , M2M , m2m
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=
hdfs://localhost.localdomain:8020/user/cloudera/mytwittes/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100

Its time to run the Flume Script

$flume-ng agent -n TwitterAgent -f Twitter.conf --plugins-path /home/cloudera/Desktop/MyPlugin/plugins.d -Dflume.root.logger=DEBUG,console

Output will be :

14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:Tamila.conf
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
14/10/26 09:44:19 INFO node.AbstractConfigurationProvider: Creating channels
14/10/26 09:44:19 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Created channel MemChannel
14/10/26 09:44:20 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type com.tamil.TwitterSource
14/10/26 09:44:20 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
14/10/26 09:44:20 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter, HDFS]
14/10/26 09:44:20 INFO node.Application: Starting new configuration:{ sourceRunners:{Twitter=EventDrivenSourceRunner: { source:com.tamil.TwitterSource{name:Twitter,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@79c8a90d counterGroup:{ name:null counters:{} } }} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
14/10/26 09:44:20 INFO node.Application: Starting Channel MemChannel
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
14/10/26 09:44:20 INFO node.Application: Starting Sink HDFS
14/10/26 09:44:20 INFO node.Application: Starting Source Twitter
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
14/10/26 09:44:20 INFO twitter4j.TwitterStreamImpl: Establishing connection.

Now you can check the data in HDFS using
 
$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/mytwittes