Thursday 18 September 2014

My First Apache Flume Program

My Objective:

  • Flume should point to a source file [ typically log file] in local system.
  • Flume should read the file and store the file into HDFS specific folder specified in flume configuration.
  • Whenever any changes in the file occur Flume should read and update the same into HDFS file system.
 
            Instead of Streaming log file , I created my own text file similor to log file. So whenever I change the content using editor and save flume agent should run and invoke the changes from local file and it should update the same into HDFS file.

1. Create a local file into the following folder.

/home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log


2. Create a HDFS Directory to store to configure with Flume.
So that flume will read the local file MyLog.log and write into HDFS directory with some file name [ HDFS will take care to create HDFS File name ]

$ hadoop fs -mkdir  hdfs://localhost.localdomain/user/cloudera/tamilflume


3. Create Flume Configuration file "myflume.conf"

agent.sources=exec-source
agent.sinks=hdfs-sink
agent.channels=ch1

agent.sources.exec-source.type=exec
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log

agent.sinks.hdfs-sink.type=hdfs
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost.localdomain/user/cloudera/tamilflume
agent.sinks.hdfs-sink.hdfs.filePrefix=apacheaccess
agent.sinks.hdfs-sink.hdfs.rollInterval=10
agent.sinks.hdfs-sink.hdfs.rollSize=0

agent.channels.ch1.type=memory
agent.channels.ch1.capacity=1000

agent.sources.exec-source.channels=ch1
agent.sinks.hdfs-sink.channel=ch1


Here  'agent' is the name for the flume agent,
so configuration file uses the name for source, sink & channel configuration.

source  :
            type = exec [ executable command ]
            command is linux tail command , like tail the log file.

sink :
            type is hdfs [ Hadoop file syste ]
            path is newly created  hdfs://localhost.localdomain/user/cloudera/tamilflume path.
            fileprefix string , that all the hdfs files in hdfs directory will starts with this string value.
           
channel :
            type is memory.
            capacity is 1000 . means 1000 agents can run.
           
           
Its time to run the flume agent.

Open the terminal and get into the location where you stored your myflume.conf.

$ flume-ng agent -n agent -c conf -f myflume.conf --name agent -Dflume.root.logger=INFO,console


            Now you can see Flume agent will start running
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/09/17 10:25:34 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:myflume.conf
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: agent
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Processing:hdfs-sink
14/09/17 10:25:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Creating channels
14/09/17 10:25:34 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory
14/09/17 10:25:34 INFO node.AbstractConfigurationProvider: Created channel ch1
14/09/17 10:25:34 INFO source.DefaultSourceFactory: Creating instance of source exec-source, type exec
14/09/17 10:25:34 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink, type: hdfs
14/09/17 10:25:35 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
14/09/17 10:25:35 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [exec-source, hdfs-sink]
14/09/17 10:25:35 INFO node.Application: Starting new configuration:{ sourceRunners:{exec-source=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:exec-source,state:IDLE} }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4a776482 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
14/09/17 10:25:35 INFO node.Application: Starting Channel ch1
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started
14/09/17 10:25:35 INFO node.Application: Starting Sink hdfs-sink
14/09/17 10:25:35 INFO node.Application: Starting Source exec-source
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink started
14/09/17 10:25:35 INFO source.ExecSource: Exec source starting with command:tail -F /home/cloudera/Desktop/JavaCode/FlumeExample/first/myLog.log
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: exec-source: Successfully registered new MBean.
14/09/17 10:25:35 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: exec-source started
14/09/17 10:25:39 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:39 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp
14/09/17 10:25:51 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:25:51 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772


Whenever you edit the log file and save you can see the Flume writer will trigger in Flume agent console.

14/09/17 10:25:51 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:25:59 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:25:59 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp
14/09/17 10:26:09 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:09 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
14/09/17 10:26:09 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:26:32 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:26:32 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp
14/09/17 10:26:42 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:26:42 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
14/09/17 10:26:42 INFO hdfs.HDFSEventSink: Writer callback called.
14/09/17 10:27:03 INFO hdfs.HDFSSequenceFile: writeFormat = Writable, UseRawLocalFileSystem = false
14/09/17 10:27:03 INFO hdfs.BucketWriter: Creating hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Closing hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp
14/09/17 10:27:13 INFO hdfs.BucketWriter: Close tries incremented
14/09/17 10:27:13 INFO hdfs.BucketWriter: Renaming hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232.tmp to hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232
14/09/17 10:27:13 INFO hdfs.HDFSEventSink: Writer callback called.


            Even you can see the HDFS folder  to see the files.


$ hadoop fs -ls  hdfs://localhost.localdomain/user/cloudera/tamilflume
Found 4 items
-rw-r--r--   1 cloudera cloudera        142 2014-09-17 10:25 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
-rw-r--r--   1 cloudera cloudera        170 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
-rw-r--r--   1 cloudera cloudera        200 2014-09-17 10:26 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974792793
-rw-r--r--   1 cloudera cloudera        228 2014-09-17 10:27 hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974823232


            You can print the file content using
 
$ hadoop fs -cat  hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974739772
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable����Bu­'|ČŠH���*hikumar
                                                                                                  H���.
$ hadoop fs -cat  hdfs://localhost.localdomain/user/cloudera/tamilflume/apacheaccess.1410974759526
SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritablezC��=
SrW

iqBH��:hikumarH��;Hi Tamil
           


                                                 Well-done guys. You done it :-).



Monday 1 September 2014

My First Hadoop Program with MRUnit


My first Hadoop program with MRUnit.
May be it is useful to the new hadoop developer starting from level Zero.
Here I am going to describe my first Hadoop program with MRUnit Test.

Prerequisites:
1. Oracle Virtual Box.
2.  Download & Install Cloudera VM in Oracle Virtual Box [ Here I used on hadoop 3.2.0 specific vm]
3. Download and place Jar Files for MRUnit Test in lib folder.
commons-logging-1.1.1.jar , commons-logging-1.1.1-sources.jar , hamcrest-core-1.1.jar ,junit-4.10.jar , log4j-1.2.15.jar , mockito-all-1.8.5.jar , mrunit-0.9.0-incubating-hadoop2.jar

Steps:
In Eclipse create new a Java project named "FirstHadoopProgram" with package name com.tamil.
Add libraries using project>>properties>>Java Build path >> Libraries >> Add External JARs...
Select all jar files present in /opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/hadoop/client folder.
Add all MRUnit Jar files from lib folder.


Source Files : 

Java Files
Copy all the files into project & Run the WordCountMapperTest .java file.

Now we need to create a jar file to run this program in Hadoop cluster.

1. Create a folder in test in desktop.
2. Place all the java files ( except JUnit test file ) in to the test folder.
3. Create a folder "wordcount" inside test folder.
4. compile the java code :

javac -cp `hadoop classpath` -d wordcount/ WordCountMapper.java WordCountReducer.java WordCountDriver.java

All the files will be compiled and class file will be placed into wordcount folder.

5. Create jar file

jar -cvf wordcount.jar -C wordcount/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/tamil/(in = 0) (out= 0)(stored 0%)
adding: com/tamil/WordCountReducer.class(in = 1612) (out= 677)(deflated 58%)
adding: com/tamil/WordCountMapper.class(in = 1886) (out= 861)(deflated 54%)
adding: com/tamil/WordCountDriver.class(in = 1744) (out= 942)(deflated 45%)


6. Create and  Move your Text file(MyText.txt) into HDFS to test our MapReduce program.

hadoop fs -mkdir  /user/cloudera/myInput
hadoop fs -put MyText.txt /user/cloudera/myInput/
hadoop fs -ls  /user/cloudera/myInput/
Found 1 items
-rw-r--r--   1 cloudera cloudera        141 2014-09-01 03:23 /user/cloudera/myInput/MyText.txt

7. Run Hadoop Jar file in HDFS:

$ hadoop jar wordcount.jar com.tamil/WordCountDriver /user/cloudera/myInput/ /user/cloudera/myOutput
14/09/01 03:29:18 INFO client.RMProxy: Connecting to ResourceManager at localhost.localdomain/127.0.0.1:8032
14/09/01 03:29:18 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/09/01 03:29:18 INFO input.FileInputFormat: Total input paths to process : 1
14/09/01 03:29:18 INFO mapreduce.JobSubmitter: number of splits:1
14/09/01 03:29:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1409554960178_0001
14/09/01 03:29:19 INFO impl.YarnClientImpl: Submitted application application_1409554960178_0001
14/09/01 03:29:19 INFO mapreduce.Job: The url to track the job: http://localhost.localdomain:8088/proxy/application_1409554960178_0001/
14/09/01 03:29:19 INFO mapreduce.Job: Running job: job_1409554960178_0001
14/09/01 03:29:25 INFO mapreduce.Job: Job job_1409554960178_0001 running in uber mode : false
14/09/01 03:29:25 INFO mapreduce.Job:  map 0% reduce 0%
14/09/01 03:29:30 INFO mapreduce.Job:  map 100% reduce 0%
14/09/01 03:29:35 INFO mapreduce.Job:  map 100% reduce 100%
14/09/01 03:29:35 INFO mapreduce.Job: Job job_1409554960178_0001 completed successfully
14/09/01 03:29:36 INFO mapreduce.Job: Counters: 49
               File System Counters
                              FILE: Number of bytes read=67
                              FILE: Number of bytes written=183335
                              FILE: Number of read operations=0
                              FILE: Number of large read operations=0
                              FILE: Number of write operations=0
                              HDFS: Number of bytes read=272
                              HDFS: Number of bytes written=33
                              HDFS: Number of read operations=6
                              HDFS: Number of large read operations=0
                              HDFS: Number of write operations=2
               Job Counters
                              Launched map tasks=1
                              Launched reduce tasks=1
                              Data-local map tasks=1
                              Total time spent by all maps in occupied slots (ms)=699648
                              Total time spent by all reduces in occupied slots (ms)=757760
                              Total time spent by all map tasks (ms)=2733
                              Total time spent by all reduce tasks (ms)=2960
                              Total vcore-seconds taken by all map tasks=2733
                              Total vcore-seconds taken by all reduce tasks=2960
                              Total megabyte-seconds taken by all map tasks=699648
                              Total megabyte-seconds taken by all reduce tasks=757760
               Map-Reduce Framework
                              Map input records=3
                              Map output records=7
                              Map output bytes=52
                              Map output materialized bytes=63
                              Input split bytes=131
                              Combine input records=7
                              Combine output records=6
                              Reduce input groups=6
                              Reduce shuffle bytes=63
                              Reduce input records=6
                              Reduce output records=6
                              Spilled Records=12
                              Shuffled Maps =1
                              Failed Shuffles=0
                              Merged Map outputs=1
                              GC time elapsed (ms)=50
                              CPU time spent (ms)=1110
                              Physical memory (bytes) snapshot=459587584
                              Virtual memory (bytes) snapshot=1806524416
                              Total committed heap usage (bytes)=355467264
               Shuffle Errors
                              BAD_ID=0
                              CONNECTION=0
                              IO_ERROR=0
                              WRONG_LENGTH=0
                              WRONG_MAP=0
                              WRONG_REDUCE=0
               File Input Format Counters
                              Bytes Read=141
               File Output Format Counters
                              Bytes Written=33

8. Check the result in HDFS newly created folder by Mapreduce program

$ hadoop fs -ls /user/cloudera/myOutput
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2014-09-01 03:29 /user/cloudera/myOutput/_SUCCESS
-rw-r--r--   1 cloudera cloudera         33 2014-09-01 03:29 /user/cloudera/myOutput/part-r-00000

9. Print the result in console

$ hadoop fs -cat /user/cloudera/myOutput/part-r-00000
I              1
It            1
am         1
is            2
it             1
useful    1

Wow... Great work... Its working... :-)