public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> { //sample line of weather data //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999 private Text outText = new Text(); private TemperatureAveragingPair pair = new TemperatureAveragingPair(); private static final int MISSING = 9999; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring(15, 21); int tempStartPosition = 87; if (line.charAt(tempStartPosition) == '+') { tempStartPosition += 1; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92)); if (temp != MISSING) { outText.set(yearMonth); pair.set(temp, 1); context.write(outText, pair); } } }
通过让mapper输出key和TemperatureAveragingPair对象,不管combiner有没有执行我们的mapreduce程序都能输出正确的结果。
我们要减少传输的数据量,就要尽可能把相同气温的计数合并,但是又不能影响最终的平均数计算。当combiner执行的时候,它会把具有相同key的TemperatureAveragingPair 合并成一个,包含汇总的气温和频度计数。下面是combiner的代码:
public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> { private TemperatureAveragingPair pair = new TemperatureAveragingPair(); @Override protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException { int temp = 0; int count = 0; for (TemperatureAveragingPair value : values) { temp += value.getTemp().get(); count += value.getCount().get(); } pair.set(temp,count); context.write(key,pair); } }
但是我们确实很关心如何减少需要传输给reducer的数据量,下面我们将会看看如何实现这个目的。
与word-count相同,为了计算均值,在mapper中合并的方法会用到一个hashmap,它以年月为key,以TemperatureAveragingPair为值。合并相同年月的数据的时候我们需要取出以该年月为key的TemperatureAveragingPair对象,将temperature属性和count属性累加。最后在cleanUp方法被调用的时候会输出hashmap中所有的key和TemperatureAveragingPair。
public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> { //sample line of weather data //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999 private static final int MISSING = 9999; private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring(15, 21); int tempStartPosition = 87; if (line.charAt(tempStartPosition) == '+') { tempStartPosition += 1; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92)); if (temp != MISSING) { TemperatureAveragingPair pair = pairMap.get(yearMonth); if(pair == null){ pair = new TemperatureAveragingPair(); pairMap.put(yearMonth,pair); } int temps = pair.getTemp().get() + temp; int count = pair.getCount().get() + 1; pair.set(temps,count); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<String> keys = pairMap.keySet(); Text keyText = new Text(); for (String key : keys) { keyText.set(key); context.write(keyText,pairMap.get(key)); } } }
用这种在mapper中合并的方法,我们在多次map调用之间保存了信息,确保了能够对产出数据进行削减。尽管保持跨mapper的状态是一件需要小心的事情,但这在某些情况下确实很有效。
在这种情况reducer的逻辑就很简单了,遍历每个key的所有值,把temperatures 和counts加和,然后相除。
public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> { private IntWritable average = new IntWritable(); @Override protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException { int temp = 0; int count = 0; for (TemperatureAveragingPair pair : values) { temp += pair.getTemp().get(); count += pair.getCount().get(); } average.set(temp / count); context.write(key, average); } }
正如预料,使用了combiner和mapper中合并方法的结果大幅减少了输出数据。
没有优化的情况:
12/10/10 23:05:28 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:05:28 INFO mapred.JobClient: Combine output records=0 12/10/10 23:05:28 INFO mapred.JobClient: Map input records=6565 12/10/10 23:05:28 INFO mapred.JobClient: Reduce shuffle bytes=111594 12/10/10 23:05:28 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:05:28 INFO mapred.JobClient: Spilled Records=13128 12/10/10 23:05:28 INFO mapred.JobClient: Map output bytes=98460 12/10/10 23:05:28 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:05:28 INFO mapred.JobClient: Combine input records=0 12/10/10 23:05:28 INFO mapred.JobClient: Map output records=6564 12/10/10 23:05:28 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:05:28 INFO mapred.JobClient: Reduce input records=6564
使用了Combiner的情况:
12/10/10 23:07:19 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:07:19 INFO mapred.JobClient: Combine output records=12 12/10/10 23:07:19 INFO mapred.JobClient: Map input records=6565 12/10/10 23:07:19 INFO mapred.JobClient: Reduce shuffle bytes=210 12/10/10 23:07:19 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:07:19 INFO mapred.JobClient: Spilled Records=24 12/10/10 23:07:19 INFO mapred.JobClient: Map output bytes=98460 12/10/10 23:07:19 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:07:19 INFO mapred.JobClient: Combine input records=6564 12/10/10 23:07:19 INFO mapred.JobClient: Map output records=6564 12/10/10 23:07:19 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:07:19 INFO mapred.JobClient: Reduce input records=12
在mapper中合并的情况:
12/10/10 23:09:09 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:09:09 INFO mapred.JobClient: Combine output records=0 12/10/10 23:09:09 INFO mapred.JobClient: Map input records=6565 12/10/10 23:09:09 INFO mapred.JobClient: Reduce shuffle bytes=210 12/10/10 23:09:09 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:09:09 INFO mapred.JobClient: Spilled Records=24 12/10/10 23:09:09 INFO mapred.JobClient: Map output bytes=180 12/10/10 23:09:09 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:09:09 INFO mapred.JobClient: Combine input records=0 12/10/10 23:09:09 INFO mapred.JobClient: Map output records=12 12/10/10 23:09:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:09:09 INFO mapred.JobClient: Reduce input records=12
计算结果:
(注意: 例子里使用的文件中的的温度是摄氏度*10的结果)
Non-Optimized | Combiner | In-Mapper-Combiner Mapper |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
我们用两种场景来演示了本地聚合,一个场景比较简单只要简单重用reducer作为combiner就可以,另一个稍微复杂一些,必须对数据做一定的组织,这两种例子都充分证明了本地聚合能够极大提高处理过程的效率。
评论删除后,数据将无法恢复
评论(0)