1.MapReduce概述
Hadoop Map/Reduce是一個使用簡易的軟件框架,基于它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,并以一種可靠容錯的方式并行處理上T級別的數據集。
一個Map/Reduce 作業(job) 通常會把輸入的數據集切分為若干獨立的數據塊,由 map任務(task)以完全并行的方式處理它們??蚣軙ap的輸出先進行排序, 然后把結果輸入給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。 整個框架負責任務的調度和監控,以及重新執行已經失敗的任務。
通常,Map/Reduce框架和分布式文件系統是運行在一組相同的節點上的,也就是說,計算節點和存儲節點通常在一起。這種配置允許框架在那些已經存好數據的節點上高效地調度任務,這可以使整個集群的網絡帶寬被非常高效地利用。
Map/Reduce框架由一個單獨的master JobTracker 和每個集群節點一個slave TaskTracker共同組成。master負責調度構成一個作業的所有任務,這些任務分布在不同的slave上,master監控它們的執行,重新執行已經失敗的任務。而slave僅負責執行由master指派的任務。
應用程序至少應該指明輸入/輸出的位置(路徑),并通過實現合適的接口或抽象類提供map和reduce函數。再加上其他作業的參數,就構成了作業配置(job configuration)。然后,Hadoop的 job client提交作業(jar包/可執行程序等)和配置信息給JobTracker,后者負責分發這些軟件和配置信息給slave、調度任務并監控它們的執行,同時提供狀態和診斷信息給job-client。
雖然Hadoop框架是用Java實現的,但Map/Reduce應用程序則不一定要用 Java來寫 。
2.樣例分析:單詞計數
1、WordCount源碼分析
單詞計數是最簡單也是最能體現MapReduce思想的程序之一,該程序完整的代碼可以在Hadoop安裝包的src/examples目錄下找到
單詞計數主要完成的功能是:統計一系列文本文件中每個單詞出現的次數,如圖所示:
(1)Map過程
Map過程需要繼承org.apache.hadoop.mapreduce包中的Mapper類,并重寫map方法
通過在map方法中添加兩句把key值和value值輸出到控制臺的代碼,可以發現map方法中的value值存儲的是文本文件中的一行(以回車符作為行結束標記),而key值為該行的首字符相對于文本文件的首地址的偏移量。然后StringTokenizer類將每一行拆分成一個個的單詞,并將作為map方法的結果輸出,其余的工作都交由MapReduce框架處理。其中IntWritable和Text類是Hadoop對int和string類的封裝,這些類能夠被串行化,以方便在分布式環境中進行數據交換。
TokenizerMapper的實現代碼如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println("key = " + key.toString());//添加查看key值 System.out.println("value = " + value.toString());//添加查看value值 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } |
(2)Reduce過程
Reduce過程需要繼承org.apache.hadoop.mapreduce包中的Reducer類,并重寫reduce方法
reduce方法的輸入參數key為單個單詞,而values是由各Mapper上對應單詞的計數值所組成的列表,所以只要遍歷values并求和,即可得到某個單詞的出現總次數
IntSumReduce類的實現代碼如下:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } |
原文轉自:http://www.uml.org.cn/sjjm/201501201.asp