各task自行同步線程啟動數量,當所有線程都啟動之后,輸出收集器開始運作:
long now = System.currentTimeMillis(); if (now > this.start + this.interval) { this.start += this.interval; context.collect(new LongWritable(this.start), new LongWritable( this.writeBlocks)); } 。。。。。。 |
可以看到key是時間戳,value是我們想收集的數值,收集器收集到的數據將進一步提供給Reducer來分析,這里有一個壓力測試的關鍵點,即最大并發開始時間點和結束時間點的判斷。觀察Reducer類的reduce方法:
public void reduce(Text key, Iterator<LongWritable> value, OutputCollector<Text, Text> context, Reporter reporter) |
由于所有map都以相同的時間戳作為key,因此同一時刻迭代器value的size代表了有多少個map已經達到了最大并發度,我們判斷這個size,當其與我們預期的map總數一致時,則可以將該時間戳作為最大并發壓力的開始時間點,當size開始小于預期map總數時,則代表最大并發壓力的結束時間點,測試結果分析時可以掐取這一段數據作為測試結果,免去開始準備階段和快結束階段壓力變小對測試結果的干擾。
更進一步我們可以在hdfs上設計一個標志位,當一個maptask執行完畢之后,通過該標志位通知到其他所有map task,以便快速結束當前的測試。
原文轉自:http://www.taobaotest.com/blogs/2515