Thursday, 18 September 2014

My First Apache Flume Program

My Objective:

  • Flume should point to a source file [ typically log file] in local system.
  • Flume should read the file and store the file into HDFS specific folder specified in flume configuration.
  • Whenever any changes in the file occur Flume should read and update the same into HDFS file system.
 
            Instead of Streaming log file , I created my own text file similor to log file. So whenever I change the content using editor and save flume agent should run and invoke the changes from local file and it should update the same into HDFS file.

1. Create a local file into the following folder.

/home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log


2. Create a HDFS Directory to store to configure with Flume.
So that flume will read the local file MyLog.log and write into HDFS directory with some file name [ HDFS will take care to create HDFS File name ]

$ hadoop fs -mkdir  hdfs://localhost.localdomain/user/cloudera/tamilflume


3. Create Flume Configuration file "myflume.conf"

agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1

agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log

agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost.localdomain/user/cloudera/tamilflume
agent.sinks.hdfs-sink.hdfs.filePrefix=apacheaccess
agent.sinks.hdfs-sink.hdfs.rollInterval=10
agent.sinks.hdfs-sink.hdfs.rollSize=0

agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000

agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1


Here  'agent' is the name for the flume agent,
so configuration file uses the name for source, sink & channel configuration.

source  :
            type = exec [ executable command ]
            command is linux tail command , like tail the log file.

sink :
            type is hdfs [ Hadoop file syste ]
            path is newly created  hdfs://localhost.localdomain/user/cloudera/tamilflume path.
            fileprefix string , that all the hdfs files in hdfs directory will starts with this string value.
           
channel :
            type is memory.
            capacity is 1000 . means 1000 agents can run.
           
           
Its time to run the flume agent.

Open the terminal and get into the location where you stored your myflume.conf.

$ flume-ng agent -n agent -c conf -f myflume.conf --name agent -Dflume.root.logger=INFO,console


            Now you can see Flume agent will start running
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:myflume.conf
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: agent
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Creating channels
14/09/17 10:25:34 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Created channel ch1
14/09/17 10:25:34 INFO source.DefaultSourceFactory: Creating instance of source exec-source, type exec
14/09/17 10:25:34 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink, type: hdfs
14/09/17 10:25:35 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/09/17 10:25:35 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [exec-source, hdfs-sink]
14/09/17 10:25:35 INFO node.Application: Starting new configuration:{ sourceRunners:{exec-source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:exec-source,state:IDLE} }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4a776482 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
14/09/17 10:25:35 INFO node.Application: Starting Channel ch1
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started
14/09/17 10:25:35 INFO node.Application: Starting Sink hdfs-sink
14/09/17 10:25:35 INFO node.Application: Starting Source exec-source
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink started
14/09/17 10:25:35 INFO source.ExecSource: Exec source starting with command:tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: exec-source: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: exec-source started
14/09/17 10:25:39 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:39 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:25:51 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772


Whenever you edit the log file and save you can see the Flume writer will trigger in Flume agent console.

14/09/17 10:25:51 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:25:59 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:59 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:09 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
14/09/17 10:26:09 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:26:32 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:26:32 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:42 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
14/09/17 10:26:42 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:27:03 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:27:03 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:27:13 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232
14/09/17 10:27:13 INFO hdfs.HDFSEventSink: Writer callback called.


            Even you can see the HDFS folder  to see the files.


$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/tamilflume
Found 4 items
-rw-r--r--   1 cloudera cloudera        142 2014-09-17 10:25 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
-rw-r--r--   1 cloudera cloudera        170 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
-rw-r--r--   1 cloudera cloudera        200 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
-rw-r--r--   1 cloudera cloudera        228 2014-09-17 10:27 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232


            You can print the file content using
 
$ hadoop fs -cat  hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable����Bu­'|ČŠH���*hikumar
                                                                                                  H���.
$ hadoop fs -cat  hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritablezC��=
SrW

iqBH��:hikumarH��;Hi Tamil
           


                                                 Well-done guys. You done it :-).



1 comment:

  1. Hello Sir,
    How Can I store data into single file using flume instead of multiple files.

    ReplyDelete