14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=17886
14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=52932
14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54239
14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=71431
14/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework
14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=4
14/12/17 05:53:27 INFO mapred.JobClient: Combine output records=6
14/12/17 05:53:27 INFO mapred.JobClient: Map input records=4
14/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=4
14/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=12
14/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=78
14/12/17 05:53:27 INFO mapred.JobClient: Combine input records=8
14/12/17 05:53:27 INFO mapred.JobClient: Map output records=8
14/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=6
2、WordCount處理過程
上面給出了WordCount的設計思路和源碼,但是沒有深入細節,下面對WordCount進行更加詳細的分析:
(1)將文件拆分成splits,由于測試用的文件較小,所以每一個文件為一個split,并將文件按行分割成對,如圖,這一步由Mapreduce框架自動完成,其中偏移量包括了回車所占的字符
(2)將分割好的對交給用戶定義的map方法進行處理,生成新的對
(3)得到map方法輸出的對后,Mapper會將它們按照key值進行排序,并執行Combine過程,將key值相同的value值累加,得到Mapper的最終輸出結果,如圖:
(4)Reduce先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的對,并作為WordCount的輸出結果,如圖:
3.MapReduce,你夠了解嗎?
MapReduce框架在幕后默默地完成了很多的事情,如果不重寫map和reduce方法,會出現什么情況呢?
下面來實現一個簡化的MapReduce,新建一個LazyMapReduce,該類只對任務進行必要的初始化及輸入/輸出路徑的設置,其余的參數均保持默認
代碼如下:
public class LazyMapReduce { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage:wordcount"); System.exit(2); } Job job = new Job(conf, "LazyMapReduce"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0:1); } } |
運行結果為:
14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_0001
14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2
14/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0%
14/12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/327680
14/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output
14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 0
14/12/17 23:04:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/17 23:04:19 INFO mapred.LocalJobRunner:
14/12/17 23:04:19 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.
14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 100
14/12/17 23:04:20 INFO mapred.MapTask: data buffer = 79691776/99614720
原文轉自:http://www.uml.org.cn/sjjm/201501201.asp