MongoDB Map-reduce 如何避免全局锁 已翻译 100%

oschina 投递于 2014/04/06 19:43 (共 4 段, 翻译完成于 04-08)
阅读 2331
收藏 2
0
加载中

在MongoDB中驱动分析的两个重要特性是:

一般来说大多数的聚合架构并不需要任何全局写锁,但是当把reducer(规约)结果写到现存的或新的结果集时,Map-Reduce则需要全局写锁。

从2.2版本开始,MongoDB增加了一个新的变量(nonAtomic), 可以按照如下方式设置在OUT参数中,通过这个设置在mergereduce操作中可以略过全局写锁,但是并不支持replace操作,虽说replace在MR操作中,尤其在大数据变更中广泛的使用。

 MongoDB Map-Reduce 示例(replace,merge以及reduce)

db.mrtest.mapReduce(mapFunction, reduceFunction, { out: { replace: "mragg"}});
db.mrtest.mapReduce(mapFunction, reduceFunction, { out: { merge: "mragg", nonAtomic: true}});
db.mrtest.mapReduce(mapFunction, reduceFunction, { out: { reduce: "mragg", nonAtomic: true}});

下面源码中,你可以看到没有指定nonAtomic: true设置时,MongoDB则需要全局写锁"Lock::GlobalWrite lock"

long long State::postProcessCollection(CurOp* op, ProgressMeterHolder& pm) {
  if ( _onDisk == false || _config.outputOptions.outType == Config::INMEMORY )
    return numInMemKeys();
 
  if (_config.outputOptions.outNonAtomic)
    return postProcessCollectionNonAtomic(op, pm);
  Lock::GlobalWrite lock; // TODO(erh): this is how it was, but seems it doesn't need to be global
  return postProcessCollectionNonAtomic(op, pm);
}

这里并没有结束,当现存集合没有记录时,即便指定了merge或reduce模式,系统会简单的切换的replace模式。

摇光
翻译于 2014/04/08 08:38
2

以下为MongoDB的一些源码:

long long State::postProcessCollectionNonAtomic(CurOp* op, ProgressMeterHolder& pm) {
 
  if ( _config.outputOptions.finalNamespace == _config.tempNamespace )
    return _safeCount( _db, _config.outputOptions.finalNamespace );
 
  if (_config.outputOptions.outType == Config::REPLACE ||
      _safeCount(_db, _config.outputOptions.finalNamespace) == 0) {
    Lock::GlobalWrite lock; // TODO(erh): why global???
    // replace: just rename from temp to final collection name, dropping previous collection
    _db.dropCollection( _config.outputOptions.finalNamespace );
    BSONObj info;
 
    if ( ! _db.runCommand( "admin"
                           , BSON( "renameCollection" << _config.tempNamespace <<
                                   "to" << _config.outputOptions.finalNamespace <<
                                   "stayTemp" << _config.shardedFirstPass )
                           , info ) ) {
      uasserted( 10076 ,  str::stream() << "rename failed: " << info );
    }
 
    _db.dropCollection( _config.tempNamespace );
  }

通过删除空集合使用新集合,而不是合并/复制记录来调整性能,这虽然是很好的想法,但是考虑到全局写锁的花费,这并不是一个好的设计;最近我们已经有很多由于全局写锁导致的性能和扩展性的问题,这便是其中的一个。

以下为示例,当从MongoDB log返回到数据库中的结果集(mragg)不存在或者为空集,全局写锁在merge操作中的示例:

2014-04-04T22:20:39.064-0700 [conn27] command test.$cmd command: mapReduce { mapreduce: "mrtest", map: function () {
emit(this.user, this.orders);
}, reduce: function (user, orders) {
return Array.sum(orders);2014-04-04T22:20:39.064-0700 [conn27] command test.$cmd command: mapReduce { mapreduce: "mrtest", map: function () {
    emit(this.user, this.orders);
}, reduce: function (user, orders) {
    return Array.sum(orders);
}, out: { merge: "mragg", nonAtomic: true } } keyUpdates:0 numYields:5 locks(micros) W:3204 r:861 w:3548 reslen:129 12ms

摇光
翻译于 2014/04/08 09:04
1

解决方法:

一个简单的解决方法便是当结果集不存在或者为空集时,通过增加伪记录(add dummy record),使用merge操作,用来避免全局锁,实例如下:

 db.mragg.insert({_id: foo});
db.mrtest.mapReduce(mapFunction, reduceFunction, { out: { merge: "mragg", nonAtomic: true}});

merge模式唯一的问题就是,如果你有从前一个聚合中的任意一条记录,在新的运行中不再存在,那么这个记录将一直保留在结果集中,直至插入/替换新的或者已存在的记录。因此,人们需要决定是否使用replacemerge操作。

通常说来,replace是更加高效的,因为其可以简单的用新的替代已知的集合;如果变更或者结果集非常巨大,则强烈建议使用REPLACE模式.

摇光
翻译于 2014/04/08 09:22
1

全局锁的扩展性问题

在数据库的每个主要操作中,每当MongoDB获取/释放全局锁(读锁或者写锁)时,系统则无法应对大规模的并发操作。比如说,当涉及到map-reduce的replace操作时,它仅仅需要drop/truncate旧的集合,并从结果中创建一个新的,这也仅仅是更新全局命名空间中的一个简单微小的操作。

MySQL在metadata locking有着同样的问题,但是这个问题在5.5以后的版本中修复了,希望MongoDB能够使用同样的逻辑来解决命名空间锁的问题。

示例:

以下为完整的Map-reduce的javascript代码

venu@ ~/work/mongo/tests 01:54:20# cat mapred.js
 
// MongoDB Mapreduce Example
function format(var1) {
print()
print("-----------------------")
print(var1)
print("-----------------------")
}
 
var mapFunction = function() {
emit(this.user, this.orders);
}
 
var reduceFunction = function(user, orders) {
return Array.sum(orders);
}
 
format("truncating all records")
db.mrtest.remove({})
 
format("populating mrtest objects")
db.mrtest.insert({user: 1, orders: 3})
db.mrtest.insert({user: 1, orders: 5})
db.mrtest.insert({user: 1, orders: 18})
db.mrtest.insert({user: 2, orders: 5})
db.mrtest.insert({user: 4, orders: 18})
db.mrtest.insert({user: 3, orders: 2})
db.mrtest.insert({user: 3, orders: 3})
db.mrtest.insert({user: 4, orders: 4})
 
format("List all records")
db.mrtest.find().forEach(printjson)
 
format("Run Mapreduce..")
db.mrtest.mapReduce(mapFunction, reduceFunction, { out: {merge: "mragg", nonAtomic: true}});
 
format("List mapreduce output..")
db.mragg.find().forEach(printjson)


以下为测试结果:

venu@ ~/work/mongo/tests 22:20:45# mongo test < mapred.js
MongoDB shell version: 2.7.0-pre-
connecting to: test
 
-----------------------
truncating all records
-----------------------
WriteResult({ "nRemoved" : 10 })
 
-----------------------
populating mrtest objects
-----------------------
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
WriteResult({ "nInserted" : 1 })
 
-----------------------
List all records
-----------------------
{ "_id" : ObjectId("533fc4ff48589b9811fda697"), "user" : 1, "orders" : 3 }
{ "_id" : ObjectId("533fc4ff48589b9811fda698"), "user" : 1, "orders" : 1 }
{ "_id" : ObjectId("533fc4ff48589b9811fda699"), "user" : 1, "orders" : 5 }
{ "_id" : ObjectId("533fc4ff48589b9811fda69a"), "user" : 1, "orders" : 18 }
{ "_id" : ObjectId("533fc4ff48589b9811fda69b"), "user" : 2, "orders" : 1 }
{ "_id" : ObjectId("533fc4ff48589b9811fda69c"), "user" : 2, "orders" : 5 }
{ "_id" : ObjectId("533fc4ff48589b9811fda69d"), "user" : 4, "orders" : 18 }
{ "_id" : ObjectId("533fc4ff48589b9811fda69e"), "user" : 3, "orders" : 2 }
{ "_id" : ObjectId("533fc4ff48589b9811fda69f"), "user" : 3, "orders" : 3 }
{ "_id" : ObjectId("533fc4ff48589b9811fda6a0"), "user" : 4, "orders" : 4 }
 
-----------------------
Run Mapreduce..
-----------------------
{
"result" : "mragg",
"timeMillis" : 9,
"counts" : {
"input" : 10,
"emit" : 10,
"reduce" : 4,
"output" : 4
},
"ok" : 1,
}
 
-----------------------
List mapreduce output..
-----------------------
{ "_id" : 1, "value" : 27 }
{ "_id" : 2, "value" : 6 }
{ "_id" : 3, "value" : 5 }
{ "_id" : 4, "value" : 22 }

摇光
翻译于 2014/04/08 10:05
1
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
加载中

评论(1)

摇光
摇光

Reduce,有不同的翻译,化简 或 规约,看了具体的解释,以及MongoDB中的介绍,觉得两个名字都合理,可是翻成中文,这是两个语义区别很大的词,我没有MongoDB的实际项目经验,希望有经验的人指教

返回顶部
顶部