Monday, 27 October 2014

Apache Flume for Twitter Data Mining


Objective:


Using Apache Flume collect the Twitter data for data analysis.

Prerequisite :

  • Must have Twitter Application account to collect the data.
  • Basic knowledge of Flume life cycle.
Flume cannot collect the data from twitter using its existing in build flume source files. So we need to create our own flume source to collect the data from twitter and bind with Flume Event.

Here we are going to see how to create out custom flume source creation.
Deploy the source jar file in Flume plugin.s. Flume Execution to store the data from twitter to HDFS file system

Twitter Source :

            This twitter source will serve the data collection source part in flume.
It will collect the data as a JSON Object format and binding it with Event.
Flume will collect this data from Source and store it into HDFS file system.
Here I used twitter stream version 2.2.6.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>com.tamil</groupId>
            <artifactId>TwitterSource</artifactId>
            <version>0.1</version>
            <dependencies>
                        <dependency>
                                    <groupId>org.twitter4j</groupId>
                                    <artifactId>twitter4j-stream</artifactId>
                                    <version>2.2.6</version>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-core</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.apache.flume</groupId>
                                    <artifactId>flume-ng-sdk</artifactId>
                                    <version>1.4.0</version>
                                    <scope>provided</scope>
                        </dependency>
                        <dependency>
                                    <groupId>junit</groupId>
                                    <artifactId>junit</artifactId>
                                    <version>4.8.2</version>
                                    <scope>test</scope>
                        </dependency>
                        <dependency>
                                    <groupId>org.slf4j</groupId>
                                    <artifactId>slf4j-api</artifactId>
                                    <version>1.6.1</version>
                        </dependency>
                        <!-- <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
                                    <version>2.5.0</version> <scope>provided</scope> </dependency> -->
            </dependencies>

            <build>
                        <plugins>
                                    <plugin>
                                                <artifactId>maven-assembly-plugin</artifactId>
                                                <configuration>
                                                            <archive>
                                                                        <manifest>
                                                                                    <mainClass>com.tamil.TwitterSource</mainClass>
                                                                        </manifest>
                                                            </archive>
                                                            <descriptorRefs>
                                                                        <descriptorRef>jar-with-dependencies</descriptorRef>
                                                            </descriptorRefs>
                                                </configuration>
                                                <executions>
                                                            <execution>
                                                                        <id>make-assembly</id>
                                                                        <phase>package</phase>
                                                                        <goals>
                                                                                    <goal>single</goal>
                                                                        </goals>
                                                            </execution>
                                                </executions>
                                    </plugin>

                        </plugins>
            </build>
</project>


TwitterSource.java
package com.tamil;

import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;

public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger =LoggerFactory.getLogger(TwitterSource.class);
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessTokenSecret;
    private String[] keywords;
    private TwitterStream twitterStream;
    public void configure(Context context) {
        consumerKey = context.getString("consumerKey");
        consumerSecret = context.getString("consumerSecret");
        accessToken = context.getString("accessToken");
        accessTokenSecret = context.getString("accessTokenSecret");
        String keywordString = context.getString("keywords");
        if (keywordString.trim().length() == 0) {keywords = new String[0];}
        else {
            keywords = keywordString.split(",");
            for (int i = 0; i < keywords.length; i++) {
                keywords[i] = keywords[i].trim();
            }
        }

        ConfigurationBuilder conBuilder = new ConfigurationBuilder();
        conBuilder.setOAuthConsumerKey(consumerKey);
        conBuilder.setOAuthConsumerSecret(consumerSecret);
        conBuilder.setOAuthAccessToken(accessToken);
        conBuilder.setOAuthAccessTokenSecret(accessTokenSecret);
        conBuilder.setJSONStoreEnabled(true);
        conBuilder.setHttpProxyHost("192.168.51.80");
        conBuilder.setPrettyDebugEnabled(true);
        conBuilder.setHttpProxyPort(3128);
        conBuilder.setIncludeEntitiesEnabled(true);
        twitterStream = new TwitterStreamFactory(conBuilder.build()).getInstance();
    }

    @Override
    public void start() {
        final ChannelProcessor channel = getChannelProcessor();
        final Map<String, String> headers = new HashMap<String, String>();
        StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {
                logger.debug(status.getUser().getScreenName() + ": " + status.getText());
                headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
                Event event = EventBuilder.withBody(
                    DataObjectFactory.getRawJSON(status).getBytes(), headers);
                channel.processEvent(event);
            }
            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
            public void onScrubGeo(long userId, long upToStatusId) {}
            public void onException(Exception ex) {}
        };
        logger.debug("Setting up Twitter sample stream using consumer key {} and" +
            " access token {}", new String[] { consumerKey, accessToken });
        twitterStream.addListener(listener);
        if (keywords.length == 0) {
            logger.debug("Starting up Twitter sampling...");
            twitterStream.sample();
        }
        else {
            logger.debug("Starting up Twitter filtering...");
            FilterQuery query = new FilterQuery().track(keywords);
            twitterStream.filter(query);
        }
        super.start();
    }

    @Override
    public void stop() {
        logger.debug("Shutting down Twitter sample stream...");
        twitterStream.shutdown();
        super.stop();
    }
}


Using maven build the jar file with corresponding dependency files .
mvn clean compile install assembly:assembly

Deploying Custom Jar file in Flume Plugins.

           By Default Flume will be configured with some plugins.d path to get plugins from third party. Otherwise we can configure any folder with specific structure contain third party api as flume plugins folder using flume command line argument.

Basic structure of flume plugins folder
$/home/cloudera/Desktop/MyPlugin/plugins.d/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/lib/TamilSource.jar
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/libext/<Dependent files>


 Twitter.conf
TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=com.tamil.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=<consumerKey>
TwitterAgent.sources.Twitter.consumerSecret=< consumerSecret>
TwitterAgent.sources.Twitter.accessToken=< accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret=< accessTokenSecret>
TwitterAgent.sources.Twitter.keywords=Internet Of Things, InternetOfThings,
internetofthings, IOT, iot , M2M , m2m
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=
hdfs://localhost.localdomain:8020/user/cloudera/mytwittes/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100

Its time to run the Flume Script

$flume-ng agent -n TwitterAgent -f Twitter.conf --plugins-path /home/cloudera/Desktop/MyPlugin/plugins.d -Dflume.root.logger=DEBUG,console

Output will be :

14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/10/26 09:44:19 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:Tamila.conf
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Processing:HDFS
14/10/26 09:44:19 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
14/10/26 09:44:19 INFO node.AbstractConfigurationProvider: Creating channels
14/10/26 09:44:19 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Created channel MemChannel
14/10/26 09:44:20 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type com.tamil.TwitterSource
14/10/26 09:44:20 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
14/10/26 09:44:20 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/10/26 09:44:20 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter, HDFS]
14/10/26 09:44:20 INFO node.Application: Starting new configuration:{ sourceRunners:{Twitter=EventDrivenSourceRunner: { source:com.tamil.TwitterSource{name:Twitter,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@79c8a90d counterGroup:{ name:null counters:{} } }} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
14/10/26 09:44:20 INFO node.Application: Starting Channel MemChannel
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
14/10/26 09:44:20 INFO node.Application: Starting Sink HDFS
14/10/26 09:44:20 INFO node.Application: Starting Source Twitter
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
14/10/26 09:44:20 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
14/10/26 09:44:20 INFO twitter4j.TwitterStreamImpl: Establishing connection.

Now you can check the data in HDFS using
 
$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/mytwittes         

Saturday, 25 October 2014

Social Media Analytics - Twitter API using Java



Objective :

            Using Open source Twitter API How to collect data from Social media - Twitter about IOT.

Twitter account :

Create a new twitter account and just follow all the IOT related groups.
So that you will get tweets from your circle and also official company groups related to IOT.

Twitter Application :
Twitter API can only connect to twitter application. Only twitter application will get the twitter message and serve to Twitter API. Our Java code get the data through twitter application .

https://apps.twitter.com/app/new is used to create a new twitter application.

Application User Credentials to connect with java application
Once we created the twitter application we can get the following user credentials.
Using the following URL  you get User credentials

URL                                             : https://apps.twitter.com/app/<applicationId>/keys
Consumer Key (API Key)         : <API KEY>
Consumer Secret (API Secret) : <API Password>
Access Token                              : <ACCESS TOKEN>
Access Token Secret                  : <ACCESS TOKEN SECRET>

All these values will be generated by Twitter it self. You just need to use that values.
Note : For security purpose you can generate API key whenever you like.


Java Application - Twitter API


            In Java code we are going to create a java application, which will use the generated twitter credentials . It will gather all the text from twitter related to given keyword like IOT. This java application collect all the text from twitter and store it in a text file every one hour once. So Every one hour there will be new text file which will be used to store current twitter text related to IOT.

Create a maven project and add maven dependencies

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.tamil</groupId>
  <artifactId>DataCollection</artifactId>
  <version>0.0.1</version>
  <dependencies>
             <dependency>
                    <groupId>org.twitter4j</groupId>
                    <artifactId>twitter4j-stream</artifactId>
                    <version>4.0.2</version>
             </dependency>
             <dependency>
                    <groupId>org.twitter4j</groupId>
                    <artifactId>twitter4j-core</artifactId>
                    <version>4.0.2</version>
             </dependency>
             <dependency>
                    <groupId>org.twitter4j</groupId>
                    <artifactId>twitter4j-async</artifactId>
                    <version>4.0.2</version>
             </dependency>
             <dependency>
                    <groupId>org.twitter4j</groupId>
                    <artifactId>twitter4j-media-support</artifactId>
                    <version>4.0.2</version>
             </dependency>
       </dependencies>

       <build>
             <plugins>
                    <plugin>
                          <artifactId>maven-assembly-plugin</artifactId>
                          <configuration>
                                 <archive>
                                       <manifest>
                                              <mainClass>com.tamil.TwitteFileWriter</mainClass>
                                       </manifest>
                                 </archive>
                                 <descriptorRefs>
                                       <descriptorRef>jar-with-dependencies</descriptorRef>
                                 </descriptorRefs>
                          </configuration>
                          <executions>
                                 <execution>
                                       <id>make-assembly</id> <!-- this is used for inheritance merges -->
                                       <phase>package</phase> <!-- bind to the packaging phase -->
                                       <goals>
                                              <goal>single</goal>
                                       </goals>
                                 </execution>
                          </executions>
                    </plugin>

             </plugins>
       </build>
</project>

Maven assembly plugin to create a self executable jar file pointing to TwitteFileWriter.java


Create a java class TwitteFileWriter.java  in package com.tamil as mentioned in maven pom.xml file.

TwitterFileWritter.java

package com.tamil;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import twitter4j.DirectMessage;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
import twitter4j.UserList;
import twitter4j.UserStreamListener;
import twitter4j.conf.ConfigurationBuilder;

public class TwitteFileWriter {
       private static StringBuffer buffer = new StringBuffer();
       private static final UserStreamListener listener = new UserStreamListener() {
             public void onException(Exception arg0) {            }
             public void onTrackLimitationNotice(int arg0) {            }
             public void onStatus(Status status) {         writeTwtteFromStatus(status);          }
             public void onStallWarning(StallWarning arg0) {            }
             public void onScrubGeo(long arg0, long arg1) {             }
             public void onDeletionNotice(StatusDeletionNotice arg0) {      }
             public void onUserProfileUpdate(User arg0) {         }
             public void onUserListUpdate(User arg0, UserList arg1) {      }
             public void onUserListUnsubscription(User arg0, User arg1, UserList arg2) {    }
             public void onUserListSubscription(User arg0, User arg1, UserList arg2) {    }
             public void onUserListMemberDeletion(User arg0, User arg1, UserList arg2) {    }
             public void onUserListMemberAddition(User arg0, User arg1, UserList arg2) {           }
             public void onUserListDeletion(User arg0, UserList arg1) {      }
             public void onUserListCreation(User arg0, UserList arg1) {      }
             public void onUnfollow(User arg0, User arg1) {             }
             public void onUnfavorite(User arg0, User arg1,Status arg2) {}
             public void onUnblock(User arg0, User arg1) {        }
             public void onFriendList(long[] arg0) {       }
             public void onFollow(User arg0, User arg1) {         }
             public void onFavorite(User arg0, User arg1, Status arg2) {}
             public void onDirectMessage(DirectMessage arg0) {          }
             public void onDeletionNotice(long arg0, long arg1) {        }
             public void onBlock(User arg0, User arg1) {          }
       };

       private static final void writeTwtteFromStatus(Status status) {
             String tweet = status.getUser().getScreenName() + "\t"
                          + status.getUser().getDescription() + "\t"
                          + status.getUser().getId() + "\t"
                          + status.getUser().getLang()+ "\t"
                          + status.getUser().getLocation() + "\t"
                          + status.getUser().getName() + "\t"
                          + status.getUser().getTimeZone() + "\t"
                          + status.getText()+ "\t"
                          + status.getSource() + "\t"
                          + status.getCreatedAt();
             buffer.append("\n" + tweet);
       }

       public static void main(String[] args) {
             Timer timer = new Timer();
             RemindTask task = new RemindTask();
             String[] keywords = { "Internet Of Things", "InternetOfThings","internetofthings", "IOT", "iot" };
             ConfigurationBuilder conBuilder = new ConfigurationBuilder();
             conBuilder.setOAuthConsumerKey(<API KEY>);
             conBuilder.setOAuthConsumerSecret(<API Password>);
             conBuilder.setOAuthAccessToken(<ACCESS TOKEN>);
             conBuilder.setOAuthAccessTokenSecret(<ACCESS TOKEN SECRET> );
             conBuilder.setJSONStoreEnabled(true);
             conBuilder.setPrettyDebugEnabled(true);
             // conBuilder.setHttpProxyHost("PROXY HOST");// To Set Proxy server url [optional ]
             // conBuilder.setHttpProxyPort(PROXY PORT); // To Set Proxy server port [optional ]
             conBuilder.setIncludeEntitiesEnabled(true);
             TwitterStream twitterStream = new TwitterStreamFactory(
                          conBuilder.build()).getInstance();
             twitterStream.addListener(listener);
             twitterStream.user();
             FilterQuery query = new FilterQuery().track(keywords);
             twitterStream.filter(query);
             timer.schedule(task, 5 * 1000, 5 * 1000);
       }

       static class RemindTask extends TimerTask {
             private SimpleDateFormat format = new SimpleDateFormat("yyyyMMMddHH");
             private boolean flagToStop = false;
             private long counter = 0;
             private void checkStatusToStop() {
                    try {
                          File newFile = new File("./Stop.txt");
                          if (newFile.exists()) {
                                 flagToStop = true;
                          }
                          System.out.print(":" + counter);
                    } catch (Exception e) {
                          e.printStackTrace();
                    }
             }

             public void run() {
                    if (!flagToStop) {
                          counter++;
                          writerFile();
                          buffer.setLength(0);
                          checkStatusToStop();
                    } else {
                          System.out.println("Going to End");
                          System.exit(0);
                    }
             }

             private void writerFile() {
                    try {
                          String fileName = format.format(new Date());
                          File newFile = new File("./" + fileName + ".txt");
                          if (!newFile.exists())
                                 newFile.createNewFile();
                          if (newFile.exists()) {
                                 FileWriter writter = new FileWriter(newFile, true);
                                 BufferedWriter bufferWritter = new BufferedWriter(writter);
                                 bufferWritter.write(buffer.toString());
                                 bufferWritter.close();
                          }
                    } catch (IOException e) {
                          e.printStackTrace();
                    }
             }
       }
}

Create Self executable jar file using maven build command

mvn clean compile install assembly:assembly

Now you can run the application using

java -jar DataCollection.jar

Application will generate txt file with the following format
<YYYY><MON><DD><HH>.txt file
Example :

2014Oct1913.txt here year 2014 , Oct month 19th at 1 pm generated file which contain twitter data collected between 1 pm to 2 am.