Mongodb源码分析--Mongos

长平狐 发布于 2012/11/06 18:41
阅读 331
收藏 0
    MongoDB提供了auto-sharding 功能。因为其是auto-sharding,即mongodb通过mongos(一个自动分片模块,用于构建一个大规模的可扩展的数据库集群,这个集群可以并入动态增加的机器)自动建立一个水平扩展的数据库集群系统,将数据库分表存储在sharding的各个节点上。

    一个mongodb集群包括一些shards(包括一些mongod进程),mongos路由进程,一个或多个config服务器  

    下面是一些相关词汇说明:
    Shards : 每一个shard包括一个或多个服务和存储数据的mongod进程(mongod是MongoDB数据的核心进程)典型的每个shard开启多个服务来提高服务的可用性。这些服务/mongod进程在shard中组成一个复制集

    Chunks: Chunk是一个来自特殊集合中的一个数据范围,(collection,minKey,maxKey)描叙一个chunk,它介于minKey和maxKey范围之间。例如chunks 的maxsize大小是100M,如果一个文件达到或超过这个范围时,会被切分到2个新的chunks中。当一个shard的数据过量时,chunks将会被迁移到其他的shards上。同样,chunks也可以迁移到其他的shards上

    Config Servers : Config服务器存储着集群的metadata信息,包括每个服务器,每个shard的基本信息和chunk信息Config服务器主要存储的是chunk信息。每一个config服务器都复制了完整的chunk信息。

    今天要介绍的源码主要是Mongos的主入口函数的执行流程,首先我们打开Mongos的项目(可通过打开源码db\db_10.sln加载所有项目),如下图:
     


    注:如果要调试mongos,需要设置一个mongod进程和一个Config Server,形如:   

    d:\mongodb>bin>mongod --dbpath d:\mongodb\db\ --port 27012

    d:\mongodb>bin>mongod --configsvr --dbpath d:\mongodb\db\ --port 27022


    然后在vs2010中配置相应的boost路径信息及启动参数信息,如下图:

        

 

 

 

     下面开始正文。首先打开mongos项目中的server.cpp文件,找到下面方法:

int  main( int  argc,  char *  argv[]) {
    
try  {
        
return  _main(argc, argv);
    }
    
catch (DBException &  e) {
        cout 
<<   " uncaught exception in mongos main: "   <<  endl;
        cout 
<<  e.toString()  <<  endl;
    }
    
catch (std::exception &  e) {
        cout 
<<   " uncaught exception in mongos main: "   <<  endl;
        cout 
<<  e.what()  <<  endl;
    }
    
catch (...) {
        cout 
<<   " uncaught exception in mongos main "   <<  endl;
    }
    
return   20 ;
}



    该方法是mongos的主函数,代码很简,它主要是try方式执行_main方法,下面是_main的执行流程:

int  _main( int  argc,  char *  argv[]) {    
    
static  StaticObserver staticObserver;
    mongosCommand 
=  argv[ 0 ];
    
// 声明options信息描述对象
    po::options_description options( " General options " );
    po::options_description sharding_options(
" Sharding options " );
    po::options_description hidden(
" Hidden options " );
    po::positional_options_description positional;

    CmdLine::addGlobalOptions( options , hidden );
    
// 添加sharding选项描述信息
    sharding_options.add_options()
    ( 
" configdb "  , po::value < string > () ,  " 1 or 3 comma separated config servers "  )
    ( 
" test "  ,  " just run unit tests "  )
    ( 
" upgrade "  ,  " upgrade meta data version "  )
    ( 
" chunkSize "  , po::value < int > (),  " maximum amount of data per chunk "  )
    ( 
" ipv6 " " enable IPv6 support (disabled by default) "  )
    ( 
" jsonp " , " allow JSONP access via http (has security implications) "  )
    ;

    options.add(sharding_options);
    .....



    在完成option描述信息的初始化操作之后,下面就开始对启动命令行参数进行分析和执行了,如下:
   

.....
    
//  parse options
    po::variables_map  params ;
    
// 对argc,argv进行分析并转换成params,以便下面使用
     if  (  !  CmdLine::store( argc , argv , options , hidden , positional ,  params  ) )
        
return   0 ;

    
//  The default value may vary depending on compile options, but for mongos
    
//  we want durability to be disabled.
    cmdLine.dur  =   false ;
    
// 如果是help
     if  (  params .count(  " help "  ) ) {
        cout 
<<  options  <<  endl;
        
return   0 ;
    }
    
// 如果是版本信息
     if  (  params .count(  " version "  ) ) {
        printShardingVersionInfo();
        
return   0 ;
    }
    
// 如要设置chunkSize
     if  (  params .count(  " chunkSize "  ) ) {
        Chunk::MaxChunkSize 
=   params [ " chunkSize " ]. as < int > ()  *   1024   *   1024 ;
    }

    ......

    
// 必选项,设置configdb信息
     if  (  !   params .count(  " configdb "  ) ) {
        
out ()  <<   " error: no args for --configdb "   <<  endl;
        
return   4 ;
    }

    vector
< string >  configdbs;
    
// 对参数configdb进行分割 (以','分割 )
    splitStringDelim(  params [ " configdb " ]. as < string > () ,  & configdbs ,  ' , '  );
    
// mongodb强制为1或3,具体原因不明
     if  ( configdbs.size()  !=   1   &&  configdbs.size()  !=   3  ) {
        
out ()  <<   " need either 1 or 3 configdbs "   <<  endl;
        
return   5 ;
    }

    
//  we either have a seeting were all process are in localhost or none is
     for  ( vector < string > ::const_iterator it  =  configdbs.begin() ; it  !=  configdbs.end() ;  ++ it ) {
        
try  {
            
//  根据地址参数实例化HostAndPort对象,如地址不合法则抛出异常
            HostAndPort configAddr(  * it );  

            
if  ( it  ==  configdbs.begin() ) {
                grid.setAllowLocalHost( configAddr.isLocalHost() );
            }
            
// 不允许在configdbs出现本地地址,注:如果configdb中全部为本地地址
            
// (实际用处不大)时不会执行下面if逻辑
             if  ( configAddr.isLocalHost()  !=  grid.allowLocalHost() ) {
                
out ()  <<   " cannot mix localhost and ip addresses in configdbs "   <<  endl;
                
return   10 ;
            }

        }
        
catch  ( DBException &  e) {
            
out ()  <<   " configdb:  "   <<  e.what()  <<  endl;
            
return   9 ;
        }
    }


    上面完成了对命令行参数分析之后,接下来mongos要加载绑定几个hook:
  

     //  set some global state
    
// 添加对链接池hook的绑定(shardingConnectionHook对象引用),以最终调用其onHandedOut方法
    pool.addHook(  & shardingConnectionHook );
    
// 设置链接池名称
    pool.setName(  " mongos connectionpool "  );
    
// 不设置“延迟kill游标”
    DBClientConnection::setLazyKillCursor(  false  );
    
// 设置当replicaSet配置修改时的hook对象(replicaSetChangey方法会更新链接对象信息
    ReplicaSetMonitor::setConfigChangeHook( boost::bind(  & ConfigServer::replicaSetChange ,  & configServer , _1 ) );

    
    上面的hook主要是在mongos主程序启动完成后,在运行期间执行一些数据操作时执行某些额外操作。从代码可以看出,mongos使用了链接池功能以提升获取链接的效率,具体实现机制我会在后绪章节中加以阐述。代码中的ReplicaSetMonitor类为一个维护和获取有效复制集的监视类,它提供了获取有效master,slave 的方法。完成这一步绑定后,接着mongos就会对config server信息进行初始化和升级操作了,如下:

  

    // 显示sharding版本信息
    printShardingVersionInfo();
    
// 实始化configServer
     if  (  !  configServer.init( configdbs ) ) {
        cout 
<<   " couldn't resolve config db address "   <<  endl;
        
return   7 ;
    }

    
if  (  !  configServer.ok(  true  ) ) {
        cout 
<<   " configServer startup check failed "   <<  endl;
        
return   8 ;
    }
    
// 检查Config版本信息(必要时进行升级操作)
     int  configError  =  configServer.checkConfigVersion(  params .count(  " upgrade "  ) );
    
if  ( configError ) {
        
if  ( configError  >   0  ) {
            cout 
<<   " upgrade success! "   <<  endl;
        }
        
else  {
            cout 
<<   " config server error:  "   <<  configError  <<  endl;
        }
        
return  configError;
    }
    
// 重新设置config db信息(包括shard中chunk的min,lastmod信息)
    configServer.reloadSettings();


    最后就是启动侦听服务,这里mongos启动了两个侦听服务器,一个是以线程方式启动,用于接收授权的用户操作信息,另一个则是普遍的循环侦听服务,用于侦听客户端message如下:

  

  // 初始化一些Signals信息,用于处理程序退出,中断等情况
    init();
    
// 以线程方式启动webserver,循环侦听授权访问的 message信息,详见dbwebserver.cpp文件中allowed方法
    boost::thread web( boost::bind( & webServerThread,  new  NoAdminAccess()  /*  takes ownership  */ ) );

    MessageServer::Options opts;
    opts.port 
=  cmdLine.port;
    opts.ipList 
=  cmdLine.bind_ip;
    start(opts);
// 启动message服务器,侦听客户端message

    dbexit( EXIT_CLEAN );
    
return   0 ;

    到这里,main代码就介绍完了,但上面代码段中的start才是启动balancer来均衡各个shard间chunk的操作,所以我们接着再看一下该方法的实现:
 

  void  start(  const  MessageServer::Options &  opts ) {
        setThreadName( 
" mongosMain "  ); // 设置线程名称
        installChunkShardVersioning(); // 绑定chunk shard版本控制信息
        balancer.go(); // 均衡shard 中chunk(节点)信息,详情参见 balance.cpp的run()方法
        cursorCache.startTimeoutThread(); // 对空闲(过期)游标进行清除操作

        log() 
<<   " waiting for connections on port  "   <<  cmdLine.port  <<  endl;
        ShardedMessageHandler handler;
        MessageServer 
*  server  =  createServer( opts ,  & handler ); // 构造server对象
        server -> setAsTimeTracker();
        server
-> run(); // 启动message服务
    }



    好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍balancer的实现方式和操作流程。

    原文链接:http://www.cnblogs.com/daizhj/archive/2011/05/16/2022041.html
    作者: daizhj, 代震军   
    微博: http://t.sina.com.cn/daizhj
    Tags: mongodb,c++,mongos,chunk,balance,shard


原文链接:http://www.cnblogs.com/daizhj/archive/2011/05/16/2022041.html
加载中
返回顶部
顶部