My Objective:
- Flume should point to a source file [ typically log file] in local system.
- Flume should read the file and store the file into HDFS specific folder specified in flume configuration.
- Whenever any changes in the file occur Flume should read and update the same into HDFS file system.
Instead of Streaming log file , I created my own text file similor to log file. So whenever I change the content using editor and save flume agent should run and invoke the changes from local file and it should update the same into HDFS file.
1. Create a local file into the following folder.
/home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log
|
2. Create a HDFS Directory to store to configure with Flume.
So that flume will read the local file MyLog.log and write into HDFS directory with some file name [ HDFS will take care to create HDFS File name ]
$ hadoop fs -mkdir hdfs://localhost.localdomain/user/cloudera/tamilflume
|
3. Create Flume Configuration file "myflume.conf"
agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1
agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log
agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost.localdomain/user/cloudera/tamilflume
agent.sinks.hdfs-sink.hdfs.filePrefix=apacheaccess
agent.sinks.hdfs-sink.hdfs.rollInterval=10
agent.sinks.hdfs-sink.hdfs.rollSize=0
agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000
agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1
|
Here 'agent' is the name for the flume agent,
so configuration file uses the name for source, sink & channel configuration.
source :
type = exec [ executable command ]
command is linux tail command , like tail the log file.
sink :
type is hdfs [ Hadoop file syste ]
path is newly created hdfs://localhost.localdomain/user/cloudera/tamilflume path.
fileprefix string , that all the hdfs files in hdfs directory will starts with this string value.
channel :
type is memory.
capacity is 1000 . means 1000 agents can run.
Its time to run the flume agent.
Open the terminal and get into the location where you stored your myflume.conf.
$ flume-ng agent -n agent -c conf -f myflume.conf --name agent -Dflume.root.logger=INFO,console
|
Now you can see Flume agent will start running
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:myflume.conf
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: agent
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Creating channels
14/09/17 10:25:34 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Created channel ch1
14/09/17 10:25:34 INFO source.DefaultSourceFactory: Creating instance of source exec-source, type exec
14/09/17 10:25:34 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink, type: hdfs
14/09/17 10:25:35 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/09/17 10:25:35 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [exec-source, hdfs-sink]
14/09/17 10:25:35 INFO node.Application: Starting new configuration:{ sourceRunners:{exec-source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:exec-source,state:IDLE} }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4a776482 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
14/09/17 10:25:35 INFO node.Application: Starting Channel ch1
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started
14/09/17 10:25:35 INFO node.Application: Starting Sink hdfs-sink
14/09/17 10:25:35 INFO node.Application: Starting Source exec-source
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink started
14/09/17 10:25:35 INFO source.ExecSource: Exec source starting with command:tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: exec-source: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: exec-source started
14/09/17 10:25:39 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:39 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:25:51 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
|
Whenever you edit the log file and save you can see the Flume writer will trigger in Flume agent console.
14/09/17 10:25:51 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:25:59 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:59 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:09 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
14/09/17 10:26:09 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:26:32 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:26:32 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:42 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
14/09/17 10:26:42 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:27:03 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:27:03 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:27:13 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232
14/09/17 10:27:13 INFO hdfs.HDFSEventSink: Writer callback called.
|
Even you can see the HDFS folder to see the files.
$ hadoop fs -ls hdfs://localhost.localdomain/user/cloudera/tamilflume
Found 4 items
-rw-r--r-- 1 cloudera cloudera 142 2014-09-17 10:25 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
-rw-r--r-- 1 cloudera cloudera 170 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
-rw-r--r-- 1 cloudera cloudera 200 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
-rw-r--r-- 1 cloudera cloudera 228 2014-09-17 10:27 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232
|
You can print the file content using
$ hadoop fs -cat hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable����Bu�'�|ȊH���*hikumar
H���.
$ hadoop fs -cat hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritablezC��=
SrW�
�iq�B�H��:�hikumarH��;Hi Tamil |
Well-done guys. You done it :-).