使用 MapReduce 进行数据密集型文本处理 已翻译 100%

可观 投递于 2013/01/24 23:21 (共 8 段, 翻译完成于 01-28)
阅读 1078
收藏 9
0
加载中
距离我上次发文,已经有一段时间了。因为我最近忙着 Coursera提供 的一些课程。这些课程里面有些很有趣的东西并值得去研究。前段时间,我买了一本由Jimmy和Chris Dyer编写的书Data-Intensive Processing with MapReduce。这本书为我们展示了一些MapReduce的核心算法,但是是以伪代码方式讲述的。我的目标是把这本书中第3-6节的算法以Hadoop去实现,并参考Tom White撰写的书籍Hadoop: The Definitive Guide。我假设本文的读者已经了解Hadoop和MapReduce,本文不再详细描述基础的概念。现在让我们一起进入章节3-MapReduce算法设计,并以本地聚集作为开始。
enixyu
翻译于 2013/01/27 21:55
2

局部聚焦(Local Aggregation)

在一个很高级别中,当Mappers发出数据,中间结果被写到磁盘然后通过网络发送给Reducers进行最终处理。延迟后的数据写到磁盘上然后通过网络传送数据这种方法在一个MapReduce的处理上是一个代价高昂的操作。所以为了让它无论何时都尽可能的忍受这点,就需要映射器(mappers)减少数据发送的数量加快MapReduce工作的速度。Local aggregation 是一个减少数据数量的的一个技术并提升了MapReduce工作的效率。Local aggregation不能代替reducers,那样我们需要一种方式用不同mappers中相同的的关键字去得到其结果,我们将会考虑实现局部聚焦(Local Aggregation)的三种方式。

1.使用Hadoop Combiner的功能

2.结合Text Processing with MapReduce这本书提供两种方法的"in-mapper"

当然任何优化是将要都要进行权衡我们同样也将讨论这些。

为了验证 local aggregation,我们将在一个简单版本的A Christmas Carol 上运行普遍字计数器,在一个虚拟分布群集中安装在我的MacBookPro上,使用来自Cloudera的hadoop-0.20.2-cdh3u3。我计划以后会用到更规范实际的数据在EC2群集中发布运行相同的测试

唐柯德
翻译于 2013/01/26 17:50
2

Combiners

combiner功能由继承了Reducer class的对象实现。事实上,在我们的例子里,我们会重用word count中的reducer来作为combiner。combiner 在配置MapReduce job的时候指定,就像这样:

 job.setReducerClass(TokenCountReducer.class);

下面是reducer的代码:

public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
              count+= value.get();
        }
        context.write(key,new IntWritable(count));
    }
}

combiner的作用就如它的名字,聚合数据以尽量减少shuffle阶段的网络传输量。如前所述,reducer仍然需要把来自不同mapper的同样的key聚集起来。因为combiner功能只是对过程的一个优化,所以Hadoop框架不能保证combiner会被调用多少次。(配置了combinere就一定会执行,但是执行1次还是n次是预先不确定的)

可观
翻译于 2013/01/27 21:37
1

在Mapper聚合的方法1

不用combiner的话,替代方法之一只需要对我们原来的word count mapper做一个小小的修改:

public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        IntWritable writableCount = new IntWritable();
        Text text = new Text();
        Map<String,Integer> tokenMap = new HashMap<String, Integer>();
        StringTokenizer tokenizer = new StringTokenizer(value.toString());

        while(tokenizer.hasMoreElements()){
            String token = tokenizer.nextToken();
            Integer count = tokenMap.get(token);
            if(count == null) count = new Integer(0);
            count+=1;
            tokenMap.put(token,count);
        }

        Set<String> keys = tokenMap.keySet();
        for (String s : keys) {
             text.set(s);
             writableCount.set(tokenMap.get(s));
             context.write(text,writableCount);
        }
    }
}

如我们所看到的,输出的词的计数不再是1,我们用一个map记录处理过的每个词。处理完毕一行中的所有词,然后遍历这个map,输出每个词在一行中的出现次数。

可观
翻译于 2013/01/27 21:48
1

In Mapper Combining 第二种选择

In Mapper Combining(第41页 图3.3)的第二种选择与上面的例子非常相像,只是有两个不同的地方- 当哈希表被创建和当我们发送包含在map里面的结果。在上面的例子中,一个map被创建并在每次调用map的方式的时候,把内容写到报文中。在这个例子中,我们将创建一个实例变量map,并把map的实例化提升为mapper的setUp方法。同样的,在所有调用mapper的方法完成后并调用了cleanUp方法后,存储在map中的数据才会被发送到reducer。

public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    private  Map<String,Integer> tokenMap;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
           tokenMap = new HashMap<String, Integer>();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while(tokenizer.hasMoreElements()){
            String token = tokenizer.nextToken();
            Integer count = tokenMap.get(token);
            if(count == null) count = new Integer(0);
            count+=1;
            tokenMap.put(token,count);
        }
    }


    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        IntWritable writableCount = new IntWritable();
        Text text = new Text();
        Set<String> keys = tokenMap.keySet();
        for (String s : keys) {
            text.set(s);
            writableCount.set(tokenMap.get(s));
            context.write(text,writableCount);
        }
    }
}
enixyu
翻译于 2013/01/27 22:24
1

正如上面的代码所示,在mapper里,跨越所有map方法调用,记录每个词的出现次数。通过这样做,大大减少了发送到reducer的记录数量,能够减少MapReduce任务的运行时间。达到的效果与使用MapReduce框架的combiner功能相同,但是这种情况下你要自己保证你的聚合代码是正确的。但是使用这种方法的时候要注意,在map方法调用过程中始终保持状态是有问题的,这有悖于“map”功能的原义。而且,在map调用过程中保持状态也需要关注你的内存使用。总之,根据不同情况来做权衡,选择最合适的办法。


可观
翻译于 2013/01/27 22:47
1

结果

现在让我们来看一下不同mapper的结果。因为job运行在伪分布式模式下,这个运行时间不足以参考,不过我们仍然可以推断出使用了本地聚合之后是如何影响真实集群上运行的MapReduce job的效率的。

每个词输出一次的Mapper:

12/09/13 21:25:32 INFO mapred.JobClient:     Reduce shuffle bytes=366010
12/09/13 21:25:32 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:25:32 INFO mapred.JobClient:     Spilled Records=63118
12/09/13 21:25:32 INFO mapred.JobClient:     Map output bytes=302886

在mapper中聚合方法1:

12/09/13 21:28:15 INFO mapred.JobClient:     Reduce shuffle bytes=354112
12/09/13 21:28:15 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:28:15 INFO mapred.JobClient:     Spilled Records=60704
12/09/13 21:28:15 INFO mapred.JobClient:     Map output bytes=293402

在mapper中聚合方法2:

12/09/13 21:30:49 INFO mapred.JobClient:     Reduce shuffle bytes=105885
12/09/13 21:30:49 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:30:49 INFO mapred.JobClient:     Spilled Records=15314
12/09/13 21:30:49 INFO mapred.JobClient:     Map output bytes=90565

使用了Combiner:

12/09/13 21:22:18 INFO mapred.JobClient:     Reduce shuffle bytes=105885
12/09/13 21:22:18 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:22:18 INFO mapred.JobClient:     Spilled Records=15314
12/09/13 21:22:18 INFO mapred.JobClient:     Map output bytes=302886
12/09/13 21:22:18 INFO mapred.JobClient:     Combine input records=31559
12/09/13 21:22:18 INFO mapred.JobClient:     Combine output records=7657

正如所料,没有做任何聚合的Mapper效果最差,然后是“在mapper中聚合方法1”,差之了了。“在mapper中聚合方法2”与使用了combiner的结果很近似。比起前两种方法,他们节省了2/3的shuffle字节数。这等于减少了同样数量的网络数据传输量,十分有利于提高MapReduce job的运行效率。不过要记住,方法2或者combiner并不一定能够应用于所有的MapReduce jobs word count很适合于这种场景,但是别的情况可不一定。

可观
翻译于 2013/01/27 23:04
1

结论

正如你看到的,使用mapper里聚合方法和combiner是有好处的,不过当你在寻求提升MapReduce jobs的性能的时候你应该多考虑一些因素。至于选哪种方法,这取决于你如何权衡。

相关链接

编码快乐,不要忘了分享哦!

可观
翻译于 2013/01/27 23:14
1
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
加载中

评论(2)

qwfys
qwfys
~~
可观
可观
Local Aggregation不宜翻译成局部聚焦,应该是本地聚合。因为它讲的是使用combiner等手段在mapper阶段对数据进行初步的聚合,以减少网络数据传输量(需要传到reducer的数据),也即,在mapper的“本地”进行聚合。
返回顶部
顶部