Objective:
Using Apache Flume collect the Twitter data for data
analysis.
Prerequisite :
- Must have Twitter Application account to collect the data.
- Basic knowledge of Flume life cycle.
Flume cannot collect the data from twitter using its
existing in build flume source files. So we need to create our own flume source
to collect the data from twitter and bind with Flume Event.
Here we are going to see how to create out custom flume
source creation. 
Deploy the source jar file in Flume plugin.s. Flume
Execution to store the data from twitter to HDFS file system
Twitter Source :
            This
twitter source will serve the data collection source part in flume.
It will collect the data as a JSON Object format and binding
it with Event.
Flume will collect this data from Source and store it into
HDFS file system.
Here I used twitter stream version 2.2.6.
pom.xml
| 
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
            <modelVersion>4.0.0</modelVersion> 
            <groupId>com.tamil</groupId> 
            <artifactId>TwitterSource</artifactId> 
            <version>0.1</version> 
            <dependencies> 
                        <dependency> 
                                    <groupId>org.twitter4j</groupId> 
                                    <artifactId>twitter4j-stream</artifactId> 
                                    <version>2.2.6</version> 
                        </dependency> 
                        <dependency> 
                                    <groupId>org.apache.flume</groupId> 
                                    <artifactId>flume-ng-core</artifactId> 
                                    <version>1.4.0</version> 
                                    <scope>provided</scope> 
                        </dependency> 
                        <dependency> 
                                    <groupId>org.apache.flume</groupId> 
                                    <artifactId>flume-ng-sdk</artifactId> 
                                    <version>1.4.0</version> 
                                    <scope>provided</scope> 
                        </dependency> 
                        <dependency> 
                                    <groupId>junit</groupId> 
                                    <artifactId>junit</artifactId> 
                                    <version>4.8.2</version> 
                                    <scope>test</scope> 
                        </dependency> 
                        <dependency> 
                                    <groupId>org.slf4j</groupId> 
                                    <artifactId>slf4j-api</artifactId> 
                                    <version>1.6.1</version> 
                        </dependency> 
                        <!--
  <dependency> <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>  
                                    <version>2.5.0</version>
  <scope>provided</scope> </dependency> --> 
            </dependencies> 
            <build> 
                        <plugins> 
                                    <plugin> 
                                                <artifactId>maven-assembly-plugin</artifactId> 
                                                <configuration> 
                                                            <archive> 
                                                                        <manifest> 
                                                                                    <mainClass>com.tamil.TwitterSource</mainClass> 
                                                                        </manifest> 
                                                            </archive> 
                                                            <descriptorRefs> 
                                                                        <descriptorRef>jar-with-dependencies</descriptorRef> 
                                                            </descriptorRefs> 
                                                </configuration> 
                                                <executions> 
                                                            <execution> 
                                                                        <id>make-assembly</id> 
                                                                        <phase>package</phase> 
                                                                        <goals> 
                                                                                    <goal>single</goal> 
                                                                        </goals> 
                                                            </execution> 
                                                </executions> 
                                    </plugin> 
                        </plugins> 
            </build> 
</project> | 
TwitterSource.java
| 
package com.tamil; 
import java.util.HashMap; 
import java.util.Map; 
import org.apache.flume.Context; 
import org.apache.flume.Event; 
import org.apache.flume.EventDrivenSource; 
import org.apache.flume.channel.ChannelProcessor; 
import org.apache.flume.conf.Configurable; 
import org.apache.flume.event.EventBuilder; 
import org.apache.flume.source.AbstractSource; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import twitter4j.FilterQuery; 
import twitter4j.Status; 
import twitter4j.StatusDeletionNotice; 
import twitter4j.StatusListener; 
import twitter4j.TwitterStream; 
import twitter4j.TwitterStreamFactory; 
import twitter4j.conf.ConfigurationBuilder; 
import twitter4j.json.DataObjectFactory; 
public class TwitterSource extends AbstractSource
  implements EventDrivenSource, Configurable { 
    private static
  final Logger logger =LoggerFactory.getLogger(TwitterSource.class); 
    private String
  consumerKey; 
    private String
  consumerSecret; 
    private String
  accessToken; 
    private String
  accessTokenSecret; 
    private String[]
  keywords; 
    private
  TwitterStream twitterStream; 
    public void
  configure(Context context) { 
        consumerKey
  = context.getString("consumerKey"); 
       
  consumerSecret = context.getString("consumerSecret"); 
        accessToken
  = context.getString("accessToken"); 
       
  accessTokenSecret = context.getString("accessTokenSecret"); 
        String
  keywordString = context.getString("keywords"); 
        if
  (keywordString.trim().length() == 0) {keywords = new String[0];} 
        else { 
            keywords
  = keywordString.split(","); 
            for (int
  i = 0; i < keywords.length; i++) { 
               
  keywords[i] = keywords[i].trim(); 
            } 
        } 
       
  ConfigurationBuilder conBuilder = new ConfigurationBuilder(); 
       
  conBuilder.setOAuthConsumerKey(consumerKey); 
       
  conBuilder.setOAuthConsumerSecret(consumerSecret); 
       
  conBuilder.setOAuthAccessToken(accessToken); 
       
  conBuilder.setOAuthAccessTokenSecret(accessTokenSecret); 
       
  conBuilder.setJSONStoreEnabled(true); 
        conBuilder.setHttpProxyHost("192.168.51.80"); 
       
  conBuilder.setPrettyDebugEnabled(true); 
       
  conBuilder.setHttpProxyPort(3128); 
       
  conBuilder.setIncludeEntitiesEnabled(true); 
       
  twitterStream = new TwitterStreamFactory(conBuilder.build()).getInstance(); 
    } 
    @Override 
    public void
  start() { 
        final
  ChannelProcessor channel = getChannelProcessor(); 
        final
  Map<String, String> headers = new HashMap<String, String>(); 
       
  StatusListener listener = new StatusListener() { 
            public void onStatus(Status status) { 
               
  logger.debug(status.getUser().getScreenName() + ": " +
  status.getText()); 
               
  headers.put("timestamp",
  String.valueOf(status.getCreatedAt().getTime())); 
               
  Event event = EventBuilder.withBody( 
                   
  DataObjectFactory.getRawJSON(status).getBytes(), headers); 
               
  channel.processEvent(event); 
            } 
            public
  void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {} 
            public void onTrackLimitationNotice(int
  numberOfLimitedStatuses) {} 
            public
  void onScrubGeo(long userId, long upToStatusId) {} 
            public
  void onException(Exception ex) {} 
        }; 
       
  logger.debug("Setting up Twitter sample stream using consumer key
  {} and" + 
            "
  access token {}", new String[] { consumerKey, accessToken }); 
       
  twitterStream.addListener(listener); 
        if
  (keywords.length == 0) { 
           
  logger.debug("Starting up Twitter sampling..."); 
            twitterStream.sample(); 
        } 
        else { 
           
  logger.debug("Starting up Twitter filtering..."); 
           
  FilterQuery query = new FilterQuery().track(keywords); 
           
  twitterStream.filter(query); 
        } 
        super.start(); 
    } 
    @Override 
    public void
  stop() { 
       
  logger.debug("Shutting down Twitter sample stream..."); 
       
  twitterStream.shutdown(); 
       
  super.stop(); 
    } 
} | 
Using maven build the jar file with corresponding dependency
files .
| 
mvn clean compile install assembly:assembly  | 
Deploying Custom Jar file in Flume Plugins.
           By Default Flume will be configured with some plugins.d path
to get plugins from third party. Otherwise we can configure any folder with
specific structure contain third party api as flume plugins folder using flume
command line argument. 
Basic structure of flume plugins folder
| 
$/home/cloudera/Desktop/MyPlugin/plugins.d/ 
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/ 
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/lib/TamilSource.jar 
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/libext/<Dependent
  files> | 
 Twitter.conf
| 
TwitterAgent.sources=Twitter 
TwitterAgent.channels=MemChannel 
TwitterAgent.sinks=HDFS 
TwitterAgent.sources.Twitter.type=com.tamil.TwitterSource 
TwitterAgent.sources.Twitter.channels=MemChannel 
TwitterAgent.sources.Twitter.consumerKey=<consumerKey> 
TwitterAgent.sources.Twitter.consumerSecret=<
  consumerSecret> 
TwitterAgent.sources.Twitter.accessToken=< accessToken> 
TwitterAgent.sources.Twitter.accessTokenSecret=<
  accessTokenSecret> 
TwitterAgent.sources.Twitter.keywords=Internet Of Things,
  InternetOfThings,  
internetofthings, IOT,
  iot , M2M , m2m 
TwitterAgent.sinks.HDFS.channel=MemChannel 
TwitterAgent.sinks.HDFS.type=hdfs 
TwitterAgent.sinks.HDFS.hdfs.path= 
hdfs://localhost.localdomain:8020/user/cloudera/mytwittes/%Y/%m/%d/%H/
   
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000 
TwitterAgent.sinks.HDFS.hdfs.rollSize=0 
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000 
TwitterAgent.channels.MemChannel.type=memory 
TwitterAgent.channels.MemChannel.capacity=10000 
TwitterAgent.channels.MemChannel.transactionCapacity=100 | 
Its time to run the Flume Script
| 
$flume-ng agent -n TwitterAgent -f Twitter.conf
  --plugins-path /home/cloudera/Desktop/MyPlugin/plugins.d
  -Dflume.root.logger=DEBUG,console | 
Output will be :
14/10/26
09:44:19 INFO
node.PollingPropertiesFileConfigurationProvider: Configuration provider
starting
14/10/26
09:44:19 INFO
node.PollingPropertiesFileConfigurationProvider: Reloading configuration
file:Tamila.conf
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Post-validation flume configuration contains
configuration for agents: [TwitterAgent]
14/10/26
09:44:19 INFO
node.AbstractConfigurationProvider: Creating channels
14/10/26
09:44:19 INFO
channel.DefaultChannelFactory: Creating instance of channel MemChannel type
memory
14/10/26
09:44:20 INFO
node.AbstractConfigurationProvider: Created channel MemChannel
14/10/26
09:44:20 INFO
source.DefaultSourceFactory: Creating instance of source Twitter, type
com.tamil.TwitterSource
14/10/26
09:44:20 INFO
sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
14/10/26
09:44:20 INFO
hdfs.HDFSEventSink: Hadoop Security enabled: false
14/10/26
09:44:20 INFO
node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter,
HDFS]
14/10/26 09:44:20 INFO
node.Application: Starting new configuration:{
sourceRunners:{Twitter=EventDrivenSourceRunner: {
source:com.tamil.TwitterSource{name:Twitter,state:IDLE} }}
sinkRunners:{HDFS=SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@79c8a90d counterGroup:{
name:null counters:{} } }}
channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}}
}
14/10/26
09:44:20 INFO
node.Application: Starting Channel MemChannel
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Monitored counter group for type:
CHANNEL, name: MemChannel: Successfully registered new MBean.
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name:
MemChannel started
14/10/26
09:44:20 INFO
node.Application: Starting Sink HDFS
14/10/26
09:44:20 INFO
node.Application: Starting Source Twitter
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK,
name: HDFS: Successfully registered new MBean.
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
14/10/26
09:44:20 INFO
twitter4j.TwitterStreamImpl: Establishing connection.
Now you can check the data in HDFS using 
| 
$ hadoop fs -ls 
  hdfs://localhost.localdomain/user/cloudera/mytwittes           | 
