Showing posts with label Flume. Show all posts
Showing posts with label Flume. Show all posts

Monday, 27 October 2014

Apache Flume for Twitter Data Mining


Objective:


Using Apache Flume collect the Twitter data for data analysis.

Prerequisite :

  • Must have Twitter Application account to collect the data.
  • Basic knowledge of Flume life cycle.
Flume cannot collect the data from twitter using its existing in build flume source files. So we need to create our own flume source to collect the data from twitter and bind with Flume Event.

Here we are going to see how to create out custom flume source creation.
Deploy the source jar file in Flume plugin.s. Flume Execution to store the data from twitter to HDFS file system

Twitter Source :

            This twitter source will serve the data collection source part in flume.
It will collect the data as a JSON Object format and binding it with Event.
Flume will collect this data from Source and store it into HDFS file system.
Here I used twitter stream version 2.2.6.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>com.tamil</groupId>
            <artifactId>TwitterSource</artifactId>
            <version>0.1</version>
            <dependencies>
                        <dependency>
                                    <groupId>org.twitter4j</groupId>
                                    <artifactId>twitter4j-stream</artifactId>
                                    <version>2.2.6</version>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-core</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-sdk</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>junit</groupId>
                                    <artifactId>junit</artifactId>
                                    <version>4.8.2</version>
                                    <scope>test</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.slf4j</groupId>
                                    <artifactId>slf4j-api</artifactId>
                                    <version>1.6.1</version>
                        </dependency>
                        <!-- <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
                                    <version>2.5.0</version> <scope>provided</scope> </dependency> -->
            </dependencies>

            <build>
                        <plugins>
                                    <plugin>
                                                <artifactId>maven-assembly-plugin</artifactId>
                                                <configuration>
                                                            <archive>
                                                                        <manifest>
                                                                                    <mainClass>com.tamil.TwitterSource</mainClass>
                                                                        </manifest>
                                                            </archive>
                                                            <descriptorRefs>
                                                                        <descriptorRef>jar-with-dependencies</descriptorRef>
                                                            </descriptorRefs>
                                                </configuration>
                                                <executions>
                                                            <execution>
                                                                        <id>make-assembly</id>
                                                                        <phase>package</phase>
                                                                        <goals>
                                                                                    <goal>single</goal>
                                                                        </goals>
                                                            </execution>
                                                </executions>
                                    </plugin>

                        </plugins>
            </build>
</project>


TwitterSource.java
package com.tamil;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;

public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger =LoggerFactory.getLogger(TwitterSource.class);
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessTokenSecret;
    private String[] keywords;
    private TwitterStream twitterStream;
    public void configure(Context context) {
        consumerKey = context.getString("consumerKey");
        consumerSecret = context.getString("consumerSecret");
        accessToken = context.getString("accessToken");
        accessTokenSecret = context.getString("accessTokenSecret");
        String keywordString = context.getString("keywords");
        if (keywordString.trim().length() == 0) {keywords = new String[0];}
        else {
            keywords = keywordString.split(",");
            for (int i = 0; i < keywords.length; i++) {
                keywords[i] = keywords[i].trim();
            }
        }

        ConfigurationBuilder conBuilder = new ConfigurationBuilder();
        conBuilder.setOAuthConsumerKey(consumerKey);
        conBuilder.setOAuthConsumerSecret(consumerSecret);
        conBuilder.setOAuthAccessToken(accessToken);
        conBuilder.setOAuthAccessTokenSecret(accessTokenSecret);
        conBuilder.setJSONStoreEnabled(true);
        conBuilder.setHttpProxyHost("192.168.51.80");
        conBuilder.setPrettyDebugEnabled(true);
        conBuilder.setHttpProxyPort(3128);
        conBuilder.setIncludeEntitiesEnabled(true);
        twitterStream = new TwitterStreamFactory(conBuilder.build()).getInstance();
    }

    @Override
    public void start() {
        final ChannelProcessor channel = getChannelProcessor();
        final Map<String, String> headers = new HashMap<String, String>();
        StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {
                logger.debug(status.getUser().getScreenName() + ": " + status.getText());
                headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
                Event event = EventBuilder.withBody(
                    DataObjectFactory.getRawJSON(status).getBytes(), headers);
                channel.processEvent(event);
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onScrubGeo(long userId, long upToStatusId) {}
            public void onException(Exception ex) {}
        };
        logger.debug("Setting up Twitter sample stream using consumer key {} and" +
            " access token {}", new String[] { consumerKey, accessToken });
        twitterStream.addListener(listener);
        if (keywords.length == 0) {
            logger.debug("Starting up Twitter sampling...");
            twitterStream.sample();
        }
        else {
            logger.debug("Starting up Twitter filtering...");
            FilterQuery query = new FilterQuery().track(keywords);
            twitterStream.filter(query);
        }
        super.start();
    }

    @Override
    public void stop() {
        logger.debug("Shutting down Twitter sample stream...");
        twitterStream.shutdown();
        super.stop();
    }
}


Using maven build the jar file with corresponding dependency files .
mvn clean compile install assembly:assembly

Deploying Custom Jar file in Flume Plugins.

           By Default Flume will be configured with some plugins.d path to get plugins from third party. Otherwise we can configure any folder with specific structure contain third party api as flume plugins folder using flume command line argument.

Basic structure of flume plugins folder
$/home/cloudera/Desktop/MyPlugin/plugins.d/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/lib/TamilSource.jar
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/libext/<Dependent files>


 Twitter.conf
TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=com.tamil.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=<consumerKey>
TwitterAgent.sources.Twitter.consumerSecret=< consumerSecret>
TwitterAgent.sources.Twitter.accessToken=< accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret=< accessTokenSecret>
TwitterAgent.sources.Twitter.keywords=Internet Of Things, InternetOfThings,
internetofthings, IOT, iot , M2M , m2m
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=
hdfs://localhost.localdomain:8020/user/cloudera/mytwittes/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100

Its time to run the Flume Script

$flume-ng agent -n TwitterAgent -f Twitter.conf --plugins-path /home/cloudera/Desktop/MyPlugin/plugins.d -Dflume.root.logger=DEBUG,console

Output will be :

14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:Tamila.conf
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
14/10/26 09:44:19 INFO node.AbstractConfigurationProvider: Creating channels
14/10/26 09:44:19 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Created channel MemChannel
14/10/26 09:44:20 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type com.tamil.TwitterSource
14/10/26 09:44:20 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
14/10/26 09:44:20 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter, HDFS]
14/10/26 09:44:20 INFO node.Application: Starting new configuration:{ sourceRunners:{Twitter=EventDrivenSourceRunner: { source:com.tamil.TwitterSource{name:Twitter,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@79c8a90d counterGroup:{ name:null counters:{} } }} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
14/10/26 09:44:20 INFO node.Application: Starting Channel MemChannel
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
14/10/26 09:44:20 INFO node.Application: Starting Sink HDFS
14/10/26 09:44:20 INFO node.Application: Starting Source Twitter
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
14/10/26 09:44:20 INFO twitter4j.TwitterStreamImpl: Establishing connection.

Now you can check the data in HDFS using
 
$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/mytwittes         

Thursday, 18 September 2014

My First Apache Flume Program

My Objective:

  • Flume should point to a source file [ typically log file] in local system.
  • Flume should read the file and store the file into HDFS specific folder specified in flume configuration.
  • Whenever any changes in the file occur Flume should read and update the same into HDFS file system.
 
            Instead of Streaming log file , I created my own text file similor to log file. So whenever I change the content using editor and save flume agent should run and invoke the changes from local file and it should update the same into HDFS file.

1. Create a local file into the following folder.

/home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log


2. Create a HDFS Directory to store to configure with Flume.
So that flume will read the local file MyLog.log and write into HDFS directory with some file name [ HDFS will take care to create HDFS File name ]

$ hadoop fs -mkdir  hdfs://localhost.localdomain/user/cloudera/tamilflume


3. Create Flume Configuration file "myflume.conf"

agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1

agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log

agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost.localdomain/user/cloudera/tamilflume
agent.sinks.hdfs-sink.hdfs.filePrefix=apacheaccess
agent.sinks.hdfs-sink.hdfs.rollInterval=10
agent.sinks.hdfs-sink.hdfs.rollSize=0

agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000

agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1


Here  'agent' is the name for the flume agent,
so configuration file uses the name for source, sink & channel configuration.

source  :
            type = exec [ executable command ]
            command is linux tail command , like tail the log file.

sink :
            type is hdfs [ Hadoop file syste ]
            path is newly created  hdfs://localhost.localdomain/user/cloudera/tamilflume path.
            fileprefix string , that all the hdfs files in hdfs directory will starts with this string value.
           
channel :
            type is memory.
            capacity is 1000 . means 1000 agents can run.
           
           
Its time to run the flume agent.

Open the terminal and get into the location where you stored your myflume.conf.

$ flume-ng agent -n agent -c conf -f myflume.conf --name agent -Dflume.root.logger=INFO,console


            Now you can see Flume agent will start running
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:myflume.conf
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: agent
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Creating channels
14/09/17 10:25:34 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Created channel ch1
14/09/17 10:25:34 INFO source.DefaultSourceFactory: Creating instance of source exec-source, type exec
14/09/17 10:25:34 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink, type: hdfs
14/09/17 10:25:35 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/09/17 10:25:35 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [exec-source, hdfs-sink]
14/09/17 10:25:35 INFO node.Application: Starting new configuration:{ sourceRunners:{exec-source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:exec-source,state:IDLE} }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4a776482 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
14/09/17 10:25:35 INFO node.Application: Starting Channel ch1
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started
14/09/17 10:25:35 INFO node.Application: Starting Sink hdfs-sink
14/09/17 10:25:35 INFO node.Application: Starting Source exec-source
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink started
14/09/17 10:25:35 INFO source.ExecSource: Exec source starting with command:tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: exec-source: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: exec-source started
14/09/17 10:25:39 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:39 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:25:51 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772


Whenever you edit the log file and save you can see the Flume writer will trigger in Flume agent console.

14/09/17 10:25:51 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:25:59 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:59 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:09 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
14/09/17 10:26:09 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:26:32 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:26:32 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:42 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
14/09/17 10:26:42 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:27:03 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:27:03 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:27:13 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232
14/09/17 10:27:13 INFO hdfs.HDFSEventSink: Writer callback called.


            Even you can see the HDFS folder  to see the files.


$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/tamilflume
Found 4 items
-rw-r--r--   1 cloudera cloudera        142 2014-09-17 10:25 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
-rw-r--r--   1 cloudera cloudera        170 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
-rw-r--r--   1 cloudera cloudera        200 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
-rw-r--r--   1 cloudera cloudera        228 2014-09-17 10:27 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232


            You can print the file content using
 
$ hadoop fs -cat  hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable����Bu­'|ČŠH���*hikumar
                                                                                                  H���.
$ hadoop fs -cat  hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritablezC��=
SrW

iqBH��:hikumarH��;Hi Tamil
           


                                                 Well-done guys. You done it :-).