Objective:
Using Apache Flume collect the Twitter data for data
analysis.
Prerequisite :
- Must have Twitter Application account to collect the data.
- Basic knowledge of Flume life cycle.
Flume cannot collect the data from twitter using its
existing in build flume source files. So we need to create our own flume source
to collect the data from twitter and bind with Flume Event.
Here we are going to see how to create out custom flume
source creation.
Deploy the source jar file in Flume plugin.s. Flume
Execution to store the data from twitter to HDFS file system
Twitter Source :
This
twitter source will serve the data collection source part in flume.
It will collect the data as a JSON Object format and binding
it with Event.
Flume will collect this data from Source and store it into
HDFS file system.
Here I used twitter stream version 2.2.6.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tamil</groupId>
<artifactId>TwitterSource</artifactId>
<version>0.1</version>
<dependencies>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>2.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<!--
<dependency> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.0</version>
<scope>provided</scope> </dependency> -->
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.tamil.TwitterSource</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
|
TwitterSource.java
package com.tamil;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
public class TwitterSource extends AbstractSource
implements EventDrivenSource, Configurable {
private static
final Logger logger =LoggerFactory.getLogger(TwitterSource.class);
private String
consumerKey;
private String
consumerSecret;
private String
accessToken;
private String
accessTokenSecret;
private String[]
keywords;
private
TwitterStream twitterStream;
public void
configure(Context context) {
consumerKey
= context.getString("consumerKey");
consumerSecret = context.getString("consumerSecret");
accessToken
= context.getString("accessToken");
accessTokenSecret = context.getString("accessTokenSecret");
String
keywordString = context.getString("keywords");
if
(keywordString.trim().length() == 0) {keywords = new String[0];}
else {
keywords
= keywordString.split(",");
for (int
i = 0; i < keywords.length; i++) {
keywords[i] = keywords[i].trim();
}
}
ConfigurationBuilder conBuilder = new ConfigurationBuilder();
conBuilder.setOAuthConsumerKey(consumerKey);
conBuilder.setOAuthConsumerSecret(consumerSecret);
conBuilder.setOAuthAccessToken(accessToken);
conBuilder.setOAuthAccessTokenSecret(accessTokenSecret);
conBuilder.setJSONStoreEnabled(true);
conBuilder.setHttpProxyHost("192.168.51.80");
conBuilder.setPrettyDebugEnabled(true);
conBuilder.setHttpProxyPort(3128);
conBuilder.setIncludeEntitiesEnabled(true);
twitterStream = new TwitterStreamFactory(conBuilder.build()).getInstance();
}
@Override
public void
start() {
final
ChannelProcessor channel = getChannelProcessor();
final
Map<String, String> headers = new HashMap<String, String>();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
logger.debug(status.getUser().getScreenName() + ": " +
status.getText());
headers.put("timestamp",
String.valueOf(status.getCreatedAt().getTime()));
Event event = EventBuilder.withBody(
DataObjectFactory.getRawJSON(status).getBytes(), headers);
channel.processEvent(event);
}
public
void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int
numberOfLimitedStatuses) {}
public
void onScrubGeo(long userId, long upToStatusId) {}
public
void onException(Exception ex) {}
};
logger.debug("Setting up Twitter sample stream using consumer key
{} and" +
"
access token {}", new String[] { consumerKey, accessToken });
twitterStream.addListener(listener);
if
(keywords.length == 0) {
logger.debug("Starting up Twitter sampling...");
twitterStream.sample();
}
else {
logger.debug("Starting up Twitter filtering...");
FilterQuery query = new FilterQuery().track(keywords);
twitterStream.filter(query);
}
super.start();
}
@Override
public void
stop() {
logger.debug("Shutting down Twitter sample stream...");
twitterStream.shutdown();
super.stop();
}
}
|
Using maven build the jar file with corresponding dependency
files .
mvn clean compile install assembly:assembly
|
Deploying Custom Jar file in Flume Plugins.
By Default Flume will be configured with some plugins.d path
to get plugins from third party. Otherwise we can configure any folder with
specific structure contain third party api as flume plugins folder using flume
command line argument.
Basic structure of flume plugins folder
$/home/cloudera/Desktop/MyPlugin/plugins.d/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/lib/TamilSource.jar
$/home/cloudera/Desktop/MyPlugin/plugins.d/custom-source-1/libext/<Dependent
files>
|
Twitter.conf
TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=com.tamil.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel
TwitterAgent.sources.Twitter.consumerKey=<consumerKey>
TwitterAgent.sources.Twitter.consumerSecret=<
consumerSecret>
TwitterAgent.sources.Twitter.accessToken=< accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret=<
accessTokenSecret>
TwitterAgent.sources.Twitter.keywords=Internet Of Things,
InternetOfThings,
internetofthings, IOT,
iot , M2M , m2m
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=
hdfs://localhost.localdomain:8020/user/cloudera/mytwittes/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=1000
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10000
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100
|
Its time to run the Flume Script
$flume-ng agent -n TwitterAgent -f Twitter.conf
--plugins-path /home/cloudera/Desktop/MyPlugin/plugins.d
-Dflume.root.logger=DEBUG,console
|
Output will be :
14/10/26
09:44:19 INFO
node.PollingPropertiesFileConfigurationProvider: Configuration provider
starting
14/10/26
09:44:19 INFO
node.PollingPropertiesFileConfigurationProvider: Reloading configuration
file:Tamila.conf
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Processing:HDFS
14/10/26
09:44:19 INFO
conf.FlumeConfiguration: Post-validation flume configuration contains
configuration for agents: [TwitterAgent]
14/10/26
09:44:19 INFO
node.AbstractConfigurationProvider: Creating channels
14/10/26
09:44:19 INFO
channel.DefaultChannelFactory: Creating instance of channel MemChannel type
memory
14/10/26
09:44:20 INFO
node.AbstractConfigurationProvider: Created channel MemChannel
14/10/26
09:44:20 INFO
source.DefaultSourceFactory: Creating instance of source Twitter, type
com.tamil.TwitterSource
14/10/26
09:44:20 INFO
sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
14/10/26
09:44:20 INFO
hdfs.HDFSEventSink: Hadoop Security enabled: false
14/10/26
09:44:20 INFO
node.AbstractConfigurationProvider: Channel MemChannel connected to [Twitter,
HDFS]
14/10/26 09:44:20 INFO
node.Application: Starting new configuration:{
sourceRunners:{Twitter=EventDrivenSourceRunner: {
source:com.tamil.TwitterSource{name:Twitter,state:IDLE} }}
sinkRunners:{HDFS=SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@79c8a90d counterGroup:{
name:null counters:{} } }}
channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}}
}
14/10/26
09:44:20 INFO
node.Application: Starting Channel MemChannel
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Monitored counter group for type:
CHANNEL, name: MemChannel: Successfully registered new MBean.
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name:
MemChannel started
14/10/26
09:44:20 INFO
node.Application: Starting Sink HDFS
14/10/26
09:44:20 INFO
node.Application: Starting Source Twitter
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK,
name: HDFS: Successfully registered new MBean.
14/10/26
09:44:20 INFO
instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
14/10/26
09:44:20 INFO
twitter4j.TwitterStreamImpl: Establishing connection.
Now you can check the data in HDFS using
$ hadoop fs -ls
hdfs://localhost.localdomain/user/cloudera/mytwittes
|
Harrah's Cherokee Casino Resort - MapyRO
ReplyDeleteCasino Property Map and 나비효과 Harrah's Cherokee Casino Resort - Find your way around 하남 출장샵 the casino, find where 익산 출장마사지 everything is 울산광역 출장샵 located 오산 출장마사지 with these helpful reviews.