0
加载中
本文是 《  Data Intensive Processing with MapReduce 中的算法实现系列文章的最新一篇。该系列文章的第一篇 在此 在第一篇里,我们讨论了使用本地聚合技术来减少shuffle阶段的网络传输数据量。减少需要传输的数据量是提高mapreduce job的性能的最有效的办法。我们在上一篇文章里用了word count来演示本地聚合。因为我们需要的只是一个最终统计结果,而在计算最终结果的过程中改变累加的分组与顺序都不会影响最终结果,所以我们可以重用reducer来作为combiner。但是如果想计算平均值怎么办?这种情况下原来的办法就行不通了,因为总体的平均值不等于各部分平均值的平均。不过如果能够清楚的意识到这一点,我们还是可以使用本地聚合方法的。在本文的例子中我们将使用 在  Hadoop经典指南 中出现过的 美国国家气候中心的天气数据  样本来计算1901年每个月的平均气温。使用combiner和mapper中聚合的计算平均值的算法可在 《  Data Intensive Processing with MapReduce 的3.1.3找到
可观
翻译于 2013/01/29 12:30
1

没有放之四海而皆准的方法

我们在上一篇文章里介绍了两种减少数据的方法,Hadoop Combiner和在mapper中聚合。Combiner被视为是一个优化措施,因此框架不会保证它会被调用多少次。所以,mapper输出的数据格式必须是符合reducer输入格式的,以便在combiner根本没有运行的情况下最终结果还是正确的。为了计算平均气温,我们需要改变一下mapper的输出。

可观
翻译于 2013/01/29 12:31
1

Mapper 的变化

在 word-count的例子里,没有优化的mapper输出每个词和值为1的计数。combiner和在mapper中聚合的方法通过一个hashmap,将每个词作为key,出现次数作为值,保存在hashmap中来减少输出。如果combiner没有调用,reducer将收到一系列key是单词,值为1的数据,这与之前的结果是一样的。(如果使用在mapper中聚合的话就不会发生这种情况,因为聚合是发生在mapper的代码里的,所以一定会被执行)。为了计算平均值,我们的mapper需要输出一个字符串key(年月)和一个定制的实现了writable接口的对象, TemperatureAveragingPair。这个对象有两个数字属性,气温以及该气温的频数。我们可以参考《Hadoop经典指南》中的 MaximumTemperatureMapper  来建立 AverageTemperatureMapper:

public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {
 //sample line of weather data
 //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999


    private Text outText = new Text();
    private TemperatureAveragingPair pair = new TemperatureAveragingPair();
    private static final int MISSING = 9999;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String yearMonth = line.substring(15, 21);

        int tempStartPosition = 87;

        if (line.charAt(tempStartPosition) == '+') {
            tempStartPosition += 1;
        }

        int temp = Integer.parseInt(line.substring(tempStartPosition, 92));

        if (temp != MISSING) {
            outText.set(yearMonth);
            pair.set(temp, 1);
            context.write(outText, pair);
        }
    }
}

通过让mapper输出key和TemperatureAveragingPair对象,不管combiner有没有执行我们的mapreduce程序都能输出正确的结果。

可观
翻译于 2013/01/29 12:31
1

Combiner

我们要减少传输的数据量,就要尽可能把相同气温的计数合并,但是又不能影响最终的平均数计算。当combiner执行的时候,它会把具有相同key的TemperatureAveragingPair 合并成一个,包含汇总的气温和频度计数。下面是combiner的代码:

  public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {
    private TemperatureAveragingPair pair = new TemperatureAveragingPair();

    @Override
    protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {
        int temp = 0;
        int count = 0;
        for (TemperatureAveragingPair value : values) {
             temp += value.getTemp().get();
             count += value.getCount().get();
        }
        pair.set(temp,count);
        context.write(key,pair);
    }
}
 

但是我们确实很关心如何减少需要传输给reducer的数据量,下面我们将会看看如何实现这个目的。

可观
翻译于 2013/01/29 12:33
1

在mapper中合并平均值

与word-count相同,为了计算均值,在mapper中合并的方法会用到一个hashmap,它以年月为key,以TemperatureAveragingPair为值。合并相同年月的数据的时候我们需要取出以该年月为key的TemperatureAveragingPair对象,将temperature属性和count属性累加。最后在cleanUp方法被调用的时候会输出hashmap中所有的key和TemperatureAveragingPair。

public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {
 //sample line of weather data
 //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999


    private static final int MISSING = 9999;
    private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String yearMonth = line.substring(15, 21);

        int tempStartPosition = 87;

        if (line.charAt(tempStartPosition) == '+') {
            tempStartPosition += 1;
        }

        int temp = Integer.parseInt(line.substring(tempStartPosition, 92));

        if (temp != MISSING) {
            TemperatureAveragingPair pair = pairMap.get(yearMonth);
            if(pair == null){
                pair = new TemperatureAveragingPair();
                pairMap.put(yearMonth,pair);
            }
            int temps = pair.getTemp().get() + temp;
            int count = pair.getCount().get() + 1;
            pair.set(temps,count);
        }
    }


    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Set<String> keys = pairMap.keySet();
        Text keyText = new Text();
        for (String key : keys) {
             keyText.set(key);
             context.write(keyText,pairMap.get(key));
        }
    }
}

用这种在mapper中合并的方法,我们在多次map调用之间保存了信息,确保了能够对产出数据进行削减。尽管保持跨mapper的状态是一件需要小心的事情,但这在某些情况下确实很有效。

可观
翻译于 2013/01/29 13:46
1

Reducer

在这种情况reducer的逻辑就很简单了,遍历每个key的所有值,把temperatures 和counts加和,然后相除。

public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {
    private IntWritable average = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {
        int temp = 0;
        int count = 0;
        for (TemperatureAveragingPair pair : values) {
            temp += pair.getTemp().get();
            count += pair.getCount().get();
        }
        average.set(temp / count);
        context.write(key, average);
    }
}

结果

正如预料,使用了combiner和mapper中合并方法的结果大幅减少了输出数据。
没有优化的情况:

12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:05:28 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce shuffle bytes=111594
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:05:28 INFO mapred.JobClient:     Spilled Records=13128
12/10/10 23:05:28 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:05:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:05:28 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:05:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input records=6564

使用了Combiner的情况:

12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:07:19 INFO mapred.JobClient:     Combine output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:07:19 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:07:19 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:07:19 INFO mapred.JobClient:     Combine input records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input records=12

在mapper中合并的情况:

12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:09:09 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:09:09 INFO mapred.JobClient:     Map output bytes=180
12/10/10 23:09:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:09:09 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input records=12

计算结果:
(注意: 例子里使用的文件中的的温度是摄氏度*10的结果)

Non-Optimized Combiner       In-Mapper-Combiner Mapper
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77
190101 -25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111 -16
190112 -77

可观
翻译于 2013/01/29 13:54
1

结论

我们用两种场景来演示了本地聚合,一个场景比较简单只要简单重用reducer作为combiner就可以,另一个稍微复杂一些,必须对数据做一定的组织,这两种例子都充分证明了本地聚合能够极大提高处理过程的效率。

相关资源

可观
翻译于 2013/01/29 13:59
1
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
加载中

评论(0)

返回顶部
顶部