Monday, 27 October 2014

Apache Flume for Twitter Data Mining


Objective:


Using Apache Flume collect the Twitter data for data analysis.

Prerequisite :

  • Must have Twitter Application account to collect the data.
  • Basic knowledge of Flume life cycle.
Flume cannot collect the data from twitter using its existing in build flume source files. So we need to create our own flume source to collect the data from twitter and bind with Flume Event.

Here we are going to see how to create out custom flume source creation.
Deploy the source jar file in Flume plugin.s. Flume Execution to store the data from twitter to HDFS file system

Twitter Source :

            This twitter source will serve the data collection source part in flume.
It will collect the data as a JSON Object format and binding it with Event.
Flume will collect this data from Source and store it into HDFS file system.
Here I used twitter stream version 2.2.6.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>com.tamil</groupId>
            <artifactId>TwitterSource</artifactId>
            <version>0.1</version>
            <dependencies>
                        <dependency>
                                    <groupId>org.twitter4j</groupId>
                                    <artifactId>twitter4j-stream</artifactId>
                                    <version>2.2.6</version>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-core</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-sdk</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>junit</groupId>
                                    <artifactId>junit</artifactId>
                                    <version>4.8.2</version>
                                    <scope>test</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.slf4j</groupId>
                                    <artifactId>slf4j-api</artifactId>
                                    <version>1.6.1</version>
                        </dependency>
                        <!-- <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
                                    <version>2.5.0</version> <scope>provided</scope> </dependency> -->
            </dependencies>

            <build>
                        <plugins>
                                    <plugin>
                                                <artifactId>maven-assembly-plugin</artifactId>
                                                <configuration>
                                                            <archive>
                                                                        <manifest>
                                                                                    <mainClass>com.tamil.TwitterSource</mainClass>
                                                                        </manifest>
                                                            </archive>
                                                            <descriptorRefs>
                                                                        <descriptorRef>jar-with-dependencies</descriptorRef>
                                                            </descriptorRefs>
                                                </configuration>
                                                <executions>
                                                            <execution>
                                                                        <id>make-assembly</id>
                                                                        <phase>package</phase>
                                                                        <goals>
                                                                                    <goal>single</goal>
                                                                        </goals>
                                                            </execution>
                                                </executions>
                                    </plugin>

                        </plugins>
            </build>
</project>


TwitterSource.java
package com.tamil;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;

public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger =LoggerFactory.getLogger(TwitterSource.class);
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessTokenSecret;
    private String[] keywords;
    private TwitterStream twitterStream;
    public void configure(Context context) {
        consumerKey = context.getString("consumerKey");
        consumerSecret = context.getString("consumerSecret");
        accessToken = context.getString("accessToken");
        accessTokenSecret = context.getString("accessTokenSecret");
        String keywordString = context.getString("keywords");
        if (keywordString.trim().length() == 0) {keywords = new String[0];}
        else {
            keywords = keywordString.split(",");
            for (int i = 0; i < keywords.length; i++) {
                keywords[i] = keywords[i].trim();
            }
        }

        ConfigurationBuilder conBuilder = new ConfigurationBuilder();
        conBuilder.setOAuthConsumerKey(consumerKey);
        conBuilder.setOAuthConsumerSecret(consumerSecret);
        conBuilder.setOAuthAccessToken(accessToken);
        conBuilder.setOAuthAccessTokenSecret(accessTokenSecret);
        conBuilder.setJSONStoreEnabled(true);
        conBuilder.setHttpProxyHost("192.168.51.80");
        conBuilder.setPrettyDebugEnabled(true);
        conBuilder.setHttpProxyPort(3128);
        conBuilder.setIncludeEntitiesEnabled(true);
        twitterStream = new TwitterStreamFactory(conBuilder.build()).getInstance();
    }

    @Override
    public void start() {
        final ChannelProcessor channel = getChannelProcessor();
        final Map<String, String> headers = new HashMap<String, String>();
        StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {
                logger.debug(status.getUser().getScreenName() + ": " + status.getText());
                headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
                Event event = EventBuilder.withBody(
                    DataObjectFactory.getRawJSON(status).getBytes(), headers);
                channel.processEvent(event);
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onScrubGeo(long userId, long upToStatusId) {}
            public void onException(Exception ex) {}
        };
        logger.debug("Setting up Twitter sample stream using consumer key {} and" +
            " access token {}", new String[] { consumerKey, accessToken });
        twitterStream.addListener(listener);
        if (keywords.length == 0) {
            logger.debug("Starting up Twitter sampling...");
            twitterStream.sample();
        }
        else {
            logger.debug("Starting up Twitter filtering...");
            FilterQuery query = new FilterQuery().track(keywords);
            twitterStream.filter(query);
        }
        super.start();
    }

    @Override
    public void stop() {
        logger.debug("Shutting down Twitter sample stream...");
        twitterStream.shutdown();
        super.stop();
    }
}


Using maven build the jar file with corresponding dependency files .
mvn clean compile install assembly:assembly

Deploying Custom Jar file in Flume Plugins.

           By Default Flume will be configured with some plugins.d path to get plugins from third party. Otherwise we can configure any folder with specific structure contain third party api as flume plugins folder using flume command line argument.

Basic structure of flume plugins folder
$/home/cloudera/Desktop/MyPlugin/plugins.d/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/lib/TamilSource.jar
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/libext/<Dependent files>


 Twitter.conf
TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=com.tamil.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=<consumerKey>
TwitterAgent.sources.Twitter.consumerSecret=< consumerSecret>
TwitterAgent.sources.Twitter.accessToken=< accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret=< accessTokenSecret>
TwitterAgent.sources.Twitter.keywords=Internet Of Things, InternetOfThings,
internetofthings, IOT, iot , M2M , m2m
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=
hdfs://localhost.localdomain:8020/user/cloudera/mytwittes/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100

Its time to run the Flume Script

$flume-ng agent -n TwitterAgent -f Twitter.conf --plugins-path /home/cloudera/Desktop/MyPlugin/plugins.d -Dflume.root.logger=DEBUG,console

Output will be :

14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:Tamila.conf
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
14/10/26 09:44:19 INFO node.AbstractConfigurationProvider: Creating channels
14/10/26 09:44:19 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Created channel MemChannel
14/10/26 09:44:20 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type com.tamil.TwitterSource
14/10/26 09:44:20 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
14/10/26 09:44:20 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter, HDFS]
14/10/26 09:44:20 INFO node.Application: Starting new configuration:{ sourceRunners:{Twitter=EventDrivenSourceRunner: { source:com.tamil.TwitterSource{name:Twitter,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@79c8a90d counterGroup:{ name:null counters:{} } }} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
14/10/26 09:44:20 INFO node.Application: Starting Channel MemChannel
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
14/10/26 09:44:20 INFO node.Application: Starting Sink HDFS
14/10/26 09:44:20 INFO node.Application: Starting Source Twitter
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
14/10/26 09:44:20 INFO twitter4j.TwitterStreamImpl: Establishing connection.

Now you can check the data in HDFS using
 
$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/mytwittes         

1 comment:

  1. Harrah's Cherokee Casino Resort - MapyRO
    Casino Property Map and 나비효과 Harrah's Cherokee Casino Resort - Find your way around 하남 출장샵 the casino, find where 익산 출장마사지 everything is 울산광역 출장샵 located 오산 출장마사지 with these helpful reviews.

    ReplyDelete