让我们插入1千万条文档,每个文档包含一个从0到1000000的整数。这意味着平均有10个文档会具有相同的值。
> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });} > db.uniques.findOne() { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 } > db.uniques.ensureIndex({dim0: 1}) > db.uniques.stats() { "ns" : "test.uniques", "count" : 10000000, "size" : 360000052, "avgObjSize" : 36.0000052, "storageSize" : 582864896, "numExtents" : 18, "nindexes" : 2, "lastExtentSize" : 153874432, "paddingFactor" : 1, "systemFlags" : 1, "userFlags" : 0, "totalIndexSize" : 576040080, "indexSizes" : { "_id_" : 324456384, "dim0_1" : 251583696 }, "ok" : 1 }
从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:
> db.runCommand( { mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" }) { "result" : "mrout", "timeMillis" : 1161960, "counts" : { "input" : 10000000, "emit" : 10000000, "reduce" : 1059138, "output" : 999961 }, "ok" : 1 }
正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个reduce,输出了999961个文档。结果就像下面这样:
> db.mrout.find() { "_id" : 1, "value" : 10 } { "_id" : 2, "value" : 5 } { "_id" : 3, "value" : 6 } { "_id" : 4, "value" : 10 } { "_id" : 5, "value" : 9 } { "_id" : 6, "value" : 12 } { "_id" : 7, "value" : 5 } { "_id" : 8, "value" : 16 } { "_id" : 9, "value" : 10 } { "_id" : 10, "value" : 13 } ...
我在上一篇博文中提到了在MR中使用排序多么有益。这个特性很少被理解。在这个例子中,处理未排序的输入意味着MR引擎将得到随机顺序的值,在RAM中根本无法reduce。相反,它将不得不把所有文章写入一个临时收集的磁盘,然后按顺序读取并reduce。让我们看看使用排序是否有助:
> db.runCommand( { mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout", sort: {dim0: 1} }) { "result" : "mrout", "timeMillis" : 192589, "counts" : { "input" : 10000000, "emit" : 10000000, "reduce" : 1000372, "output" : 999961 }, "ok" : 1 }
确实大有助益!我们下降到192秒,已经提升了6倍。reduce的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。
MongoDB对单独的MR作业并不使用多线程——它仅仅对多作业使用多线程。但通过多核CPU,在单个服务器使用Hadoop风格来并行作业非常有优势。我们需要做的是把输入分成几块,通过各个块来加速一个MR作业。也许数据集有简单的方法来分割,但其他使用splitVector命令(不明确)可以使你很快的找到分割点:
> db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000}) { "timeMillis" : 6006, "splitKeys" : [ { "dim0" : 18171 }, { "dim0" : 36378 }, { "dim0" : 54528 }, { "dim0" : 72717 }, … { "dim0" : 963598 }, { "dim0" : 981805 } ], "ok" : 1 }这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询 相当简单。通过shell,你可以使用ScopedThread,使用方法如下:
> var t = new ScopedThread(mapred, 963598, 981805) > t.start() > t.join()
现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 }) > var keys = res.splitKeys > keys.length 39 > var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: "mrout" + min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > var numThreads = 4 > var inc = Math.floor(keys.length / numThreads) + 1 > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } { "result" : "mrout0", "timeMillis" : 205790, "counts" : { "input" : 2750002, "emit" : 2750002, "reduce" : 274828, "output" : 274723 }, "ok" : 1 } { "result" : "mrout274736", "timeMillis" : 189868, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250364, "output" : 250255 }, "ok" : 1 } { "result" : "mrout524997", "timeMillis" : 191449, "counts" : { "input" : 2500014, "emit" : 2500014, "reduce" : 250120, "output" : 250019 }, "ok" : 1 } { "result" : "mrout775025", "timeMillis" : 184945, "counts" : { "input" : 2249971, "emit" : 2249971, "reduce" : 225057, "output" : 224964 }, "ok" : 1 } "ok" : 1 } { "result" : "mrout775025", "timeMillis" : 184945, "counts" : { "input" : 2249971, "emit" : 2249971, "reduce" : 225057, "output" : 224964 }, "ok" : 1 }
第一个线程时间确实超过了其他的线程,但是平均每个线程仍然用了大约190s的时间.这意味着并没有一个线程快!这有点奇怪,自从用了‘top’,在某种程度上,你可以看到所有的内核运行情况。
问题是在多线程之间会有很多锁竞争。在上锁时,MR并不是那么无私的(它每1000次读操作就会产生一次锁定),而且MR任务还会执行许多写操作,导致线程最终都会在等待另一个线程。由于每个MongoDB数据库都有私有锁,让我们尝试为每一个线程使用一个不同的输出数据库:
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } ... { "result" : { "db" : "mrdb274736", "collection" : "mrout274736" }, "timeMillis" : 105821, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250364, "output" : 250255 }, "ok" : 1 } ...
这才像话!我们现在降到了100秒,这意味着相比一个线程而言已经提升了2倍。还算差强人意吧。现在我们只有4个核所以只快了2倍,要是在8核CPU上将会快4倍,以此类推。
当把输入数据拆分到不同线程上去的时候,发生了一些有趣的事情:每个线程现在有大约250000个不同的值来输出,而不是1百万。这意味着我们可以使用“纯JS模式”,它可以通过使用jsMode:true来开启。开启后,MongoDB在处理时将不会把对象在JS和BSON之间来回翻译,相反,它使用一个限额500000个key的内部JS字典来化简所有对象。让我们看看这是否有用:
> var mapred = function(min, max) { return db.runCommand({ mapreduce: "uniques", map: function () { emit(this.dim0, 1); }, reduce: function (key, values) { return Array.sum(values); }, out: { replace: "mrout" + min, db: "mrdb" + min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } }, jsMode: true }) } > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" : 1 } connecting to: test connecting to: test connecting to: test connecting to: test > for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } ... { "result" : { "db" : "mrdb274736", "collection" : "mrout274736" }, "timeMillis" : 70507, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250156, "output" : 250255 }, "ok" : 1 } ...
现在我们降到了70秒,就搞定了任务!jsMode真心有用,尤其是当对象有很多字段的时候。这里只有一个数字字段就已经下降了30%。
在很早的2.6版本中,在任何的js函数调用的时候,我们就通过一段代码设置一个可选参数”args“。这种做法并不标准,不在使用。但是它确有留下来的原因(查看 SERVER-4654)。让我们从Git资源库中导入MongoDB,编译并运行进行测试:
... { "result" : { "db" : "mrdb274736", "collection" : "mrout274736" }, "timeMillis" : 62785, "counts" : { "input" : 2500013, "emit" : 2500013, "reduce" : 250156, "output" : 250255 }, "ok" : 1 } ...
这是明显的提高了3倍的运行速度,时间降低到了60s,大约10-15%。这种变化也提高了整体JS引擎的堆消耗。
评论删除后,数据将无法恢复
评论(8)
翻译的不错,但是ScopedThread 报错 提示没有定义 是为何呢