局部聚焦(Local Aggregation)
在一个很高级别中,当Mappers发出数据,中间结果被写到磁盘然后通过网络发送给Reducers进行最终处理。延迟后的数据写到磁盘上然后通过网络传送数据这种方法在一个MapReduce的处理上是一个代价高昂的操作。所以为了让它无论何时都尽可能的忍受这点,就需要映射器(mappers)减少数据发送的数量加快MapReduce工作的速度。Local aggregation 是一个减少数据数量的的一个技术并提升了MapReduce工作的效率。Local aggregation不能代替reducers,那样我们需要一种方式用不同mappers中相同的的关键字去得到其结果,我们将会考虑实现局部聚焦(Local Aggregation)的三种方式。
1.使用Hadoop Combiner的功能
2.结合Text Processing with MapReduce这本书提供两种方法的"in-mapper"
当然任何优化是将要都要进行权衡我们同样也将讨论这些。
为了验证 local aggregation,我们将在一个简单版本的A Christmas Carol 上运行普遍字计数器,在一个虚拟分布群集中安装在我的MacBookPro上,使用来自Cloudera的hadoop-0.20.2-cdh3u3。我计划以后会用到更规范实际的数据在EC2群集中发布运行相同的测试
combiner功能由继承了Reducer class的对象实现。事实上,在我们的例子里,我们会重用word count中的reducer来作为combiner。combiner 在配置MapReduce job的时候指定,就像这样:
job.setReducerClass(TokenCountReducer.class);
下面是reducer的代码:
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count+= value.get(); } context.write(key,new IntWritable(count)); } }
combiner的作用就如它的名字,聚合数据以尽量减少shuffle阶段的网络传输量。如前所述,reducer仍然需要把来自不同mapper的同样的key聚集起来。因为combiner功能只是对过程的一个优化,所以Hadoop框架不能保证combiner会被调用多少次。(配置了combinere就一定会执行,但是执行1次还是n次是预先不确定的)
不用combiner的话,替代方法之一只需要对我们原来的word count mapper做一个小小的修改:
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable writableCount = new IntWritable(); Text text = new Text(); Map<String,Integer> tokenMap = new HashMap<String, Integer>(); StringTokenizer tokenizer = new StringTokenizer(value.toString()); while(tokenizer.hasMoreElements()){ String token = tokenizer.nextToken(); Integer count = tokenMap.get(token); if(count == null) count = new Integer(0); count+=1; tokenMap.put(token,count); } Set<String> keys = tokenMap.keySet(); for (String s : keys) { text.set(s); writableCount.set(tokenMap.get(s)); context.write(text,writableCount); } } }
如我们所看到的,输出的词的计数不再是1,我们用一个map记录处理过的每个词。处理完毕一行中的所有词,然后遍历这个map,输出每个词在一行中的出现次数。
In Mapper Combining 第二种选择
In Mapper Combining(第41页 图3.3)的第二种选择与上面的例子非常相像,只是有两个不同的地方- 当哈希表被创建和当我们发送包含在map里面的结果。在上面的例子中,一个map被创建并在每次调用map的方式的时候,把内容写到报文中。在这个例子中,我们将创建一个实例变量map,并把map的实例化提升为mapper的setUp方法。同样的,在所有调用mapper的方法完成后并调用了cleanUp方法后,存储在map中的数据才会被发送到reducer。
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Map<String,Integer> tokenMap; @Override protected void setup(Context context) throws IOException, InterruptedException { tokenMap = new HashMap<String, Integer>(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while(tokenizer.hasMoreElements()){ String token = tokenizer.nextToken(); Integer count = tokenMap.get(token); if(count == null) count = new Integer(0); count+=1; tokenMap.put(token,count); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { IntWritable writableCount = new IntWritable(); Text text = new Text(); Set<String> keys = tokenMap.keySet(); for (String s : keys) { text.set(s); writableCount.set(tokenMap.get(s)); context.write(text,writableCount); } } }
现在让我们来看一下不同mapper的结果。因为job运行在伪分布式模式下,这个运行时间不足以参考,不过我们仍然可以推断出使用了本地聚合之后是如何影响真实集群上运行的MapReduce job的效率的。
每个词输出一次的Mapper:
12/09/13 21:25:32 INFO mapred.JobClient: Reduce shuffle bytes=366010 12/09/13 21:25:32 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:25:32 INFO mapred.JobClient: Spilled Records=63118 12/09/13 21:25:32 INFO mapred.JobClient: Map output bytes=302886
在mapper中聚合方法1:
12/09/13 21:28:15 INFO mapred.JobClient: Reduce shuffle bytes=354112 12/09/13 21:28:15 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:28:15 INFO mapred.JobClient: Spilled Records=60704 12/09/13 21:28:15 INFO mapred.JobClient: Map output bytes=293402
在mapper中聚合方法2:
12/09/13 21:30:49 INFO mapred.JobClient: Reduce shuffle bytes=105885 12/09/13 21:30:49 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:30:49 INFO mapred.JobClient: Spilled Records=15314 12/09/13 21:30:49 INFO mapred.JobClient: Map output bytes=90565
使用了Combiner:
12/09/13 21:22:18 INFO mapred.JobClient: Reduce shuffle bytes=105885 12/09/13 21:22:18 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:22:18 INFO mapred.JobClient: Spilled Records=15314 12/09/13 21:22:18 INFO mapred.JobClient: Map output bytes=302886 12/09/13 21:22:18 INFO mapred.JobClient: Combine input records=31559 12/09/13 21:22:18 INFO mapred.JobClient: Combine output records=7657
正如所料,没有做任何聚合的Mapper效果最差,然后是“在mapper中聚合方法1”,差之了了。“在mapper中聚合方法2”与使用了combiner的结果很近似。比起前两种方法,他们节省了2/3的shuffle字节数。这等于减少了同样数量的网络数据传输量,十分有利于提高MapReduce job的运行效率。不过要记住,方法2或者combiner并不一定能够应用于所有的MapReduce jobs, word count很适合于这种场景,但是别的情况可不一定。
正如你看到的,使用mapper里聚合方法和combiner是有好处的,不过当你在寻求提升MapReduce jobs的性能的时候你应该多考虑一些因素。至于选哪种方法,这取决于你如何权衡。
编码快乐,不要忘了分享哦!
评论删除后,数据将无法恢复
评论(2)