1
回答
转 一个基于ACE的负载自适应万能线程池实现
【腾讯云】学生服务器套餐10元/月 >>>   

      在C++中要进行并发处理,不可避免要使用多线程,在传统的教科书中,大家都是采用最原始的多线程技术,应用逻辑和线程并发策略紧密绑定。
      在一个典型的服务器程序中,客户端的请求往往包含了很多不同的逻辑命令,如在一个线程处理函数中,需要根据客户端的命令代码处理不同的业务逻辑:

int thrad_main(int cmd_id,char *data){
   switch(cmd_id)
   {
   case 1:
      ...
      break;
   case 2:
      ...
      break;
   }
}

   如此这般,业务处理逻辑和线程逻辑紧密耦合,这是一种很“丑陋”的代码。
   如何通过一种优雅的方法,分离并发逻辑和业务逻辑,通过通用的并发框架,业务逻辑设计者只需要关心自己的逻辑代码,交给“线程池”去处理即可,而不需要去关心如何创建线程,等待线程结果这些琐碎的“小事”?

   很简单,高手出招,必谈模式,下面是一种常用的并发模式,领导者/追随者线程池模型:

   在 一组预先分配的线程中通过“互斥”锁来同步线程之间的行为,“线程”们通过“民主选举”选出一位代表“领导者”站在最前端接收请求,拿到“任务”后,就从 身后的候选“继任者”中选出一个线程代替自己作为“领导者”,自己则变成“工作者”就跑到后面默默去执行处理命令,这个“任务”是一个包含待处理数据和处 理逻辑的自说明性任务,也就是说所有的线程不必事先知道怎么处理接收到的任务,因为他所拿到的“任务包”中就包含了如何处理任务的说明。就像一个“代工工 厂”的工人一样,无需任何文化基础,会干活就行。
   那如何实现自说明任务呢?我们定义了一种称为“Method_Request”的对象,它 包含一个接口“virtual int call (void) = 0;”,线程池接受的任务就是这种Method_Request对象的实例,比如一个通知线程池结束工作的Method_Request可以定义为如下的 类:
1       class  ExitRequest :  public  ACE_Method_Request
2      {
3       public :
4           virtual   int  call ( void ){
5               return   - 1 ;   //  Cause exit.
6            }
7      };
8 
      我们重载call接口,添加处理这个请求的逻辑代码,由于仅仅实现通知线程池结束工作的操作,我们返回一个特殊值“-1”,即可只是线城池: “工作完成了,你赶快洗洗睡吧!”,线程池会检查Method_Request对象的返回值,如果是“0”就是处理正常完成,继续等待下一个任务,如果是 “-1”,就关闭所有线程。

      再来一个复杂点的例子,派生的Method_Request不仅有处理逻辑,还包括了需要处理的数据:
 1 
 2  class  M2M_EventRequest :  public  ACE_Method_Request
 3  {
 4       //  Lua解释器,每个事件使用自己单独的脚本上下文
 5      LuaVM::ALEE_LuaService  &  m_svcs;
 6      ALEE_ScriptList_t  &  m_cmds;
 7 
 8       //  事件内容
 9      std:: string  m_type_name;
10      xml_event_t m_xml_event;
11 
12       //  调试信息
13      DebugInfo_ptr m_debug;
14 
15  public :
16      M2M_EventRequest(
17          LuaVM::ALEE_LuaService  &  svcs,
18          ALEE_ScriptList_t  & cmds,
19           string   const   &  type_name,
20          xml_event_t  event );
21 
22      M2M_EventRequest(
23          LuaVM::ALEE_LuaService  &  svcs,
24          ALEE_ScriptList_t  & cmds,
25           string   const   &  type_name,
26          xml_event_t  event ,
27          DebugInfo_ptr debug);
28 
29       virtual   ~ M2M_EventRequest ( void );
30 
31       virtual   int  call ( void );
32  };
33 
      这个Method_Request的功能是,命令线程池调用Lua解析器处理一段脚本代码,详细逻辑就不解释了,仅仅是一个示例,我们的重点在于线程池的实现。
      下面就公布这个“万能线程池的”实现,其实这是一个基于ACE的线程库实现的“领导者/追随者”模式,我在其基础上进行了改进,增加了自适应功能,可以根据请求队列的负载,自动调整线程池中的线程数目。
      闲话少说,上代码,看得懂的童鞋恭喜你内力深厚,还望多提宝贵意见,看不懂得小盆友也可以努力学习,提高自己:
// LeaderFollower.h
 1  #pragma once
 2 
 3  #include  " dllmain.h "
 4  #include  < map >
 5  #include  < ace / Synch.h >      //  ACE_Thread_Mutex
 6  #include  < ace / Task.h >      //  ACE_Task
 7 
 8  //  线程状态
 9  enum  LF_Status_t
10  {
11      TH_LEADER_ACTIVE,
12      TH_FOLLOWER,
13      TH_WORKER,
14      TH_READY,
15      TH_STOP,
16  };
17 
18  struct  LF_StatusTime_t
19  {
20      LF_Status_t    status;
21      ACE_Time_Value working_tv;
22      ACE_Time_Value start_time;
23      ACE_Time_Value stop_time;
24      ACE_Time_Value work_start;
25      ACE_Time_Value work_time;
26  };
27 
28  typedef std::map < ACE_thread_t,LF_StatusTime_t >   LF_StatusTimeList_t;
29 
30  class  LF_Follower;
31 
32  //  领导者-追随者线程池 模式实现
33  class  CPPXCORBA_API LeaderFollower
34  {
35  public :
36      LeaderFollower( void );
37       ~ LeaderFollower( void );
38 
39  protected :
40      LF_Follower  *  make_follower( void );
41       int      become_leader( void );
42       int      elect_new_leader( void );
43       bool  leader_active( void );
44       void  set_active_leader(ACE_thread_t leader);
45 
46  private :
47      ACE_Unbounded_Queue < LF_Follower *>    m_followers;
48      ACE_Thread_Mutex                    m_followers_lock;
49      ACE_Thread_Mutex                    m_leader_lock;
50      ACE_thread_t                        m_current_leader;
51 
52       //////////////////////////////////////////////////////////////////////// //
53       ///  线程池状态监控
54  public :
55       const  LF_StatusTimeList_t  &  get_status( void const ;
56       const   float  get_load_rate( void const ;
57 
58  protected :
59       void  set_status(LF_Status_t status);
60       void  set_worktime(ACE_Time_Value work_time);
61 
62  private :
63      LF_StatusTimeList_t m_status_time_list;
64      ACE_Thread_Mutex    m_status_lock;
65  };
66 

// LeaderFollower.cpp
  1  #include  " stdafx.h "
  2  #include  " LeaderFollower.h "
  3  #include  " ../cppx.core/dllmain.h "
  4 
  5  //  追随者标记
  6  class  LF_Follower
  7  {
  8      ACE_Condition < ACE_Thread_Mutex >  m_cond;
  9      ACE_thread_t                    m_owner;
 10 
 11  public :
 12      LF_Follower(ACE_Thread_Mutex  & leader_lock) : m_cond(leader_lock) {
 13          m_owner  =  ACE_Thread::self();
 14      }
 15       int  wait( void ){
 16           return  m_cond.wait();
 17      }
 18       int  signal( void ){
 19           return  m_cond.signal();
 20      }
 21      ACE_thread_t owner( void ){
 22           return  m_owner;
 23      }
 24 
 25  };
 26 
 27  //////////////////////////////////////////////////////////////////////// //
 28  LeaderFollower::LeaderFollower( void ) :
 29  m_current_leader( 0 )
 30  {
 31  }
 32 
 33  LeaderFollower:: ~ LeaderFollower( void )
 34  {
 35  }
 36 
 37  LF_Follower  *  
 38  LeaderFollower::make_follower(  void  )
 39  {
 40      ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock,  0 );
 41      
 42      LF_Follower  * fw;
 43      ACE_NEW_RETURN(fw, LF_Follower(m_leader_lock),  0 );
 44      m_followers.enqueue_tail(fw);
 45       // ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) make_follower \t: Now has %d followers.\n"), m_followers.size()));
 46       return  fw;
 47  }
 48 
 49  int  
 50  LeaderFollower::become_leader(  void  )
 51  {
 52      ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock,  - 1 );
 53 
 54       if ( leader_active()  &&  m_current_leader  !=  ACE_Thread::self() ){
 55           while (leader_active()){
 56              set_status(TH_FOLLOWER);
 57              auto_ptr < LF_Follower >  fw(make_follower());
 58              fw -> wait();          //  Wait until told to do so.
 59          }
 60      }
 61 
 62       //  Mark yourself as the active leader.
 63      set_active_leader(ACE_Thread::self());
 64      set_status(TH_LEADER_ACTIVE);
 65       // ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%t) become_leader \t: Becoming the leader.\n")));
 66       return   0 ;
 67  }
 68 
 69  int  
 70  LeaderFollower::elect_new_leader(  void  )
 71  {
 72      ACE_GUARD_RETURN(ACE_Thread_Mutex, leader_mon, m_leader_lock,  - 1 );
 73 
 74      set_active_leader( 0 );
 75 
 76       //  Wake up a follower
 77       if ! m_followers.is_empty() ){
 78          ACE_GUARD_RETURN(ACE_Thread_Mutex, follower_mon, m_followers_lock,  - 1 );
 79 
 80           //  Get the old follower.
 81          LF_Follower  * fw;
 82           if ( m_followers.dequeue_head(fw)  !=   0  )
 83               return   - 1 ;
 84 
 85           // ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Resigning and electing %d.\n"), fw->owner()));
 86           return  (fw -> signal()  ==   0 ?   0  :  - 1 ;
 87      }
 88 
 89       // ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) elect_new_leader : Oops no followers left\n")));
 90       return   - 1 ;
 91  }
 92 
 93  bool  
 94  LeaderFollower::leader_active(  void  )
 95  {
 96       return  (m_current_leader  !=   0 );
 97  }
 98 
 99  void  
100  LeaderFollower::set_active_leader( ACE_thread_t leader )
101  {
102      m_current_leader  =  leader;
103  }
104 
105  void  LeaderFollower::set_worktime( ACE_Time_Value work_time )
106  {
107      ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
108      LF_StatusTime_t  &  info  =  m_status_time_list[ACE_Thread::self()];
109      info.working_tv  =  work_time;
110  }
111 
112  void  LeaderFollower::set_status( LF_Status_t status )
113  {
114      ACE_GUARD(ACE_Thread_Mutex, status_guard, m_status_lock);
115      LF_StatusTime_t  &  info  =  m_status_time_list[ACE_Thread::self()];
116       switch (status)
117      {
118       case  TH_READY:
119          info.start_time  =  ACE_OS::gettimeofday();
120           break ;
121       case  TH_STOP:
122          info.stop_time  =  ACE_OS::gettimeofday();
123           break ;
124       case  TH_WORKER:
125          info.work_start  =  ACE_OS::gettimeofday();
126           break ;
127       case  TH_LEADER_ACTIVE:
128       case  TH_FOLLOWER:
129           if ( info.status  ==  TH_WORKER )
130              info.work_time  +=  ACE_OS::gettimeofday()  -  info.work_start;
131           break ;
132      }
133      info.status  =  status;
134  }
135 
136  const  LF_StatusTimeList_t  &  
137  LeaderFollower::get_status(  void  )  const
138  {
139       return  m_status_time_list;
140  }
141 
142  const   float  
143  LeaderFollower::get_load_rate(  void  )  const
144  {
145      ACE_Time_Value work_time,run_time;
146       foreach ( const  LF_StatusTimeList_t::value_type  &  info,get_status()){
147           if ( info.second.status  !=  TH_STOP ){
148              work_time  +=  info.second.work_time;
149              run_time  +=  ACE_OS::gettimeofday()  -  info.second.start_time;
150          }
151      }
152       return  ( float )work_time.usec() / run_time.usec() * 100 ;
153  }
154 

// LF_ThreadPool.h
 1  #pragma once
 2 
 3  #include  " LeaderFollower.h "
 4 
 5  #include  < ace / Task.h >
 6  #include  < ace / Activation_Queue.h >
 7  #include  < ace / Method_Request.h >
 8 
 9  class  CPPXCORBA_API LF_ThreadPool :
10       public  ACE_Task_Base,
11       public  LeaderFollower
12  {
13       class  ExitRequest :  public  ACE_Method_Request
14      {
15       public :
16           virtual   int  call ( void ){
17               return   - 1 ;   //  Cause exit.
18          }
19      };
20 
21       bool  m_bShutdown;
22       bool  m_bRunning;
23      ACE_Activation_Queue m_activation_queue_;
24 
25       static   const  size_t ScheduleTime  =   10 ;
26       static   const  size_t MinThreadNum  =   10 ;
27       static   const  size_t MaxThreadNum  =   20 ;
28 
29  public :
30      LF_ThreadPool( void );
31       ~ LF_ThreadPool( void );
32 
33       virtual   int  svc( void );
34 
35       int  start_stread_pool(  void  );
36       int  stop_thread_pool(  void  );
37       int  post_request( ACE_Method_Request  * request );
38 
39       int  get_queue_load( void ){  return  m_activation_queue_.method_count(); }
40       int  get_max_thread( void ){  return  MaxThreadNum; }
41       int  get_min_thread( void ){  return  MinThreadNum; }
42 
43  private :
44       int  _fork_new_thread(  void  );
45       int  _post_exit_request( void );
46  };
47 

// LF_ThreadPool.cpp
  1  #include  " stdafx.h "
  2  #include  " LF_ThreadPool.h "
  3 
  4  LF_ThreadPool::LF_ThreadPool( void ) :
  5  m_bShutdown( false ),
  6  m_bRunning( false )
  7  {
  8  }
  9 
 10  LF_ThreadPool:: ~ LF_ThreadPool( void )
 11  {
 12  }
 13 
 14  int  LF_ThreadPool::svc(  void  )
 15  {
 16       // ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) Thread started.\t: %d working threads left.\n"),thr_count()));
 17 
 18       //  线程开始运行
 19      m_bRunning  =   true ;
 20 
 21      set_status(TH_READY);
 22 
 23       while ( true ){
 24           //  Block until this thread is the leader.
 25          become_leader();
 26 
 27           //  设置线程空闲时间,空闲线程将会自动退出
 28          ACE_Time_Value tv(ScheduleTime);
 29          tv  +=  ACE_OS::gettimeofday();
 30 
 31           //  从队列获取下一个请求,并获得所有权
 32          auto_ptr < ACE_Method_Request >  request(m_activation_queue_.dequeue( & tv));
 33           if ( request. get ()  ==   0  ){                                                //  长时间没有请求,dequeue超时返回
 34               if ( elect_new_leader()  ==   0   &&  thr_count()  >  MinThreadNum )          //  成功选择新的领导者,且工作线程数大于最少线程数
 35                   break ;                                                           //  结束当前线程
 36               if ( thr_count()  <  MinThreadNum  &&  thr_count()  <  MaxThreadNum )       //  工作线程数小于最少线程数,创建新的线程
 37                  _fork_new_thread();
 38               continue ;                                                            //  继续担当领导者(优先成为领导者),或返回线程池等待
 39          }
 40 
 41           //  Elect a new leader then process the request
 42           if ( elect_new_leader()  !=   0   ||  thr_count()  <  MinThreadNum )              //  没有空余线程可成为领导者,或者线程池容量调整
 43               if ! m_bShutdown )                                                   //  且没有调度关闭
 44                   if ( thr_count()  <  MaxThreadNum )                                 //  未达到线程数上线
 45                      _fork_new_thread();                                          //  创建新的线程
 46 
 47           //  Invoke the method request.
 48          set_status(TH_WORKER);
 49 
 50          ACE_Time_Value tv_start,tv_finish,tv_working;
 51          tv_start  =  ACE_OS::gettimeofday();
 52 
 53           int  result  =  request -> call();
 54 
 55          tv_finish  =  ACE_OS::gettimeofday();
 56          tv_working  =  tv_finish  -  tv_start;
 57          set_worktime(tv_working);
 58 
 59           if ( result  ==   - 1  ){
 60               if ( thr_count()  >   1  )                                                 //  If received a ExitMethod, Notify the next Thread(if exists) to exit too.
 61                  _post_exit_request();
 62               break ;
 63          }
 64      }
 65 
 66       //  剩下最后一个线程,线程池停止
 67       if ( thr_count()  ==   1  )
 68          m_bRunning  =   false ;
 69 
 70      set_status(TH_STOP);
 71      ACE_DEBUG((LM_ERROR, ACE_TEXT( " (%t) Thread stoped.\t: %d working threads left.\n " ),thr_count() - 1 ));
 72       return   0 ;
 73  }
 74 
 75  int  LF_ThreadPool::start_stread_pool(  void  )
 76  {
 77      m_bShutdown  =   false ;
 78       return  activate(THR_NEW_LWP |  THR_JOINABLE,MinThreadNum);
 79  }
 80 
 81  int  LF_ThreadPool::stop_thread_pool(  void  )
 82  {
 83       //  线程池已停止
 84       if ! m_bRunning )
 85           return   0 ;
 86 
 87      m_bShutdown  =   true ;
 88      _post_exit_request();
 89       return  wait();
 90  }
 91 
 92  int  LF_ThreadPool::post_request( ACE_Method_Request  * request )
 93  {
 94      ACE_TRACE (ACE_TEXT ( " SvcThreadPool::enqueue " ));
 95       return  m_activation_queue_.enqueue (request);
 96  }
 97 
 98  int  LF_ThreadPool::_fork_new_thread(  void  )
 99  {
100       return  activate(THR_NEW_LWP |  THR_JOINABLE, 1 , 1 );
101  }
102 
103  int  LF_ThreadPool::_post_exit_request(  void  )
104  {
105       return  post_request( new  ExitRequest);
106  }
107 

      怎么样?很简单吧?什么?怎么用?Oh My Lady GaGa!还是告诉你吧:
1  m_pool.post_request( new  M2M_EventRequest(m_lua_svc,m_lua_scripts,type_name,xml_event, * iter));
      需要线程池出来干活的时候,创建一个请求对象,扔给他就行了!

      好了,代码就是最好的文档,C++开源社区给了我成长的土壤,希望能对后来者有所帮助。
      把这些东西贴出来,是为了整理自己的大脑,免得这些曾经顶着熊猫眼熬出来的东西,尘封在茫茫的代码海洋中,取之于前辈,还之于后人。也希望有更多的高手能够慷慨布道,壮大我们的C++社区。 
原文链接:http://www.cnblogs.com/HappyXie/archive/2011/03/02/1968802.html
ACE
举报
长平狐
发帖于6年前 1回/835阅
顶部