消息映射的服务器的设计与实现

晨曦之光 发布于 2012/04/10 15:02
阅读 154
收藏 0

服务器程序的设计有很多种方式,有多进程,多线程以及多进程/多线程混合方式,这些无论哪一种都是侦听->服 务的一种方式,主要强调以何种方式为客户提供服务,但是如果客户的需求种类非常多,不同需求之间的差异非常大的时候,另一个问题就浮上了水面,就是如何组 织这些所谓的不同的服务程序,采用回调函数的方式应该是第一个被想到的,回调函数很灵活,服务器的管理逻辑不用关注具体服务的处理逻辑,一切交给回调函数 来完成,唯一需要做的就是注册回调函数,接下来一个完美的设计就需要考虑如何设计服务器的管理流程。
我们把客户端的需求称作命令,将服务器端对应该命令的处理函数称作命令处理函数,用户不同的命令就应该对应不同的命令处理函数,那么如何设计数据结构呢? 很显然的,我们应该让命令处理函数也就是回调函数和这个命令绑定在一起,除了这些需要绑定之外,由于命令是从远端传过来的,仿照网络协议的设计方式,同样 需要一个协议来识别合法命令,最起码格式要正确,因此还需要将命令码的格式,长度等信息和命令以及回调函数绑定在一起,到此为止,随便一个合格开发人员都 能想到的就是数据封装,就是将上面这些封装成一个结构或者一个类,还是用结构吧,因为我们只需要封装数据而不需要封装操作,于是就有了下面的结构体:
struct DATAPROCESSDES
{    
    char byCommand;            //命令
    char byDataType;        //类型
    unsigned int dwDataCellLength;
    DATAPROCESSHANDLE pProcHandle;    //命令处理回调函数
};
接 下来需要做的就是如何设计注册机制了,熟悉linux内核的可以回顾一下linux的方式,linux内核事实上采用了一种很常见的架构,就是在全局维护 一个特定含义的链表,然后将符合这个含义的实体注册到其中,其实就是将该实体添加到该全局链表中,这个链表解耦了各个模块,它作为中介使得各个模块可以联 系起来,比如网卡驱动和网络协议栈,再比如文件系统和通用块驱动都是这样的,一些讲开发和设计的书会非常反感全局的东西,说全局的变量不好维护,可是如果 事情很模块化,那全局变量也乱不到哪里去,比如linux内核,它就是那几项功能并且几乎不会有什么大的改变,并且使用全局变量的模块很明确,全局变量不 会到处被使用,此时全局变量也就好维护了,全局变量在需求快速变化的时候会使事情变得很糟糕,还是以链表为例,如果起初只有一个需求,那么我们设置一个链 表L1,如果后期需求扩展了,我们可能会设置链表Ln,而且随时会改变链表对象的组织与架构,这时即使不做别的,仅仅是定位链表就够你喝一壶的了。现在我 们要设计一个注册机制,需求很单一,这样全局变量就没有不可的了,如果实在觉得全局变量磕碜,那么可以用static变量来代替,现在看一下以下这个注册 类:
typedef struct{
    XXX    con;    //可以是一个socket类型,也可以是一个SSL*类型
    YYY    addr;    //连接地址
}CON_ENTITY,*PCON_ENTITY;
typedef int (*DATAPROCESSHANDLE)(PCON_ENTITY pCon, L_REQUEST* pRequest, const char* pData, unsigned int dwReserved);
class CDataProcessDesRegister
{
public:
    CDataProcessDesRegister();
    virtual ~CDataProcessDesRegister();
protected:
    std::vector m_vtDataTypeDes;
public:
    int Register(char byCmd, char byDataType, unsigned int dwDataCellLength, DATAPROCESSHANDLE pProcHandle);
    int UnResigster(char byCmd, char byDataType);
    int GetRequestDataCellLength(char byCmd, char byDataType);
    int BuildCommandRequest(std::vector * pvtBufferData, std::vector * pvtCommandData );
    int DoProcessHandle(PCON_ENTITY pCon, L_REQUEST* pRequest, const char* pData, unsigned int dwDataLen );
};
看一下实现:
CDataProcessDesRegister::CDataProcessDesRegister(){}
CDataProcessDesRegister::~CDataProcessDesRegister(){}
int CDataProcessDesRegister::Register(char byCmd, char byDataType, unsigned int dwDataCellLength, DATAPROCESSHANDLE pProcHandle)
{
    for (int i=m_vtDataTypeDes.size()-1; i>=0; i--)
    {
        if ( m_vtDataTypeDes[i].byCommand==byCmd && m_vtDataTypeDes[i].byDataType==byDataType)
            return -1;
    }
    DATAPROCESSDES dpd;
    dpd.byCommand = byCmd;
    dpd.byDataType = byDataType;
    dpd.dwDataCellLength = dwDataCellLength;
    dpd.pProcHandle = pProcHandle;
    m_vtDataTypeDes.push_back(dpd);
    return 0;
}
int CDataProcessDesRegister::UnResigster(char byCmd, char byDataType)
{
    for (int i=m_vtDataTypeDes.size()-1; i>=0; i--)
    {
        if ( m_vtDataTypeDes[i].byCommand==byCmd && m_vtDataTypeDes[i].byDataType==byDataType)
        {
            m_vtDataTypeDes.erase(m_vtDataTypeDes.begin()+i);
            return 0;
        }
    }
    return -1;
}
//得到一个命令理应的长度,这个成员函数基本就是检查合法性用的,在下面BuildCommandRequest会用到的
int CDataProcessDesRegister::GetRequestDataCellLength(char byCmd, char byDataType)
{
    for (int i=0; i     {
        if ( m_vtDataTypeDes[i].byCommand == byCmd && m_vtDataTypeDes[i].byDataType == byDataType)
            return m_vtDataTypeDes[i].dwDataCellLength;
    }
    return 0;
}
//pvtBufferData 中存放有从特定客户端读取的数据,但是这些数据是否符合我们的协议规范呢?不一定。于是这个成员函数的作用就是将零散数据格式化成我们协议的格式,或者返 回错误。注意,特定的客户端可能有多个请求积压,因此pvtBufferData的类型是容器,该函数仅仅取出pvtBufferData的第一个来处 理,可以在该函数的外层循环处理该客户端的所有积压请求。
int CDataProcessDesRegister::BuildCommandRequest(std::vector * pvtBufferData, std::vector * pvtCommandData)
{
...//函数的实现内部会检查从客户端接收到的数据命令对应的命令处理是否已经被注册了。如果没有被注册,那么肯定是不能往下进行了
}
//该成员函数负责处理,它的作用其实就是遍历所有已经注册过的命令处理实体,然后找到负责该命令的处理实体之后调用它的回调函数
int CDataProcessDesRegister::DoProcessHandle(PCON_ENTITY pCon, L_REQUEST* pRequest, const char* pData, unsigned int dwDataLen )
{
    for (int i=0; i     {
        if (m_vtDataTypeDes[i].byCommand == pRequest->byCmd
            && m_vtDataTypeDes[i].byDataType == pRequest->byDataType
            && m_vtDataTypeDes[i].pProcHandle!=NULL)
        {
            return m_vtDataTypeDes[i].pProcHandle(pCon, pRequest, pData, dwDataLen,saddrClient);
        }
    }
    return 0x80000000;
}
接 下来我们来看一下一共会涉及到哪几个容器,因为服务器本质上还是串行的接收客户端请求,而客户端又会不确定地发送请求,因此很显然客户端需要一个容器来存 放,另外接下来一层,每一个客户端发送的请求数量不确定,并且客户端是否被响应以及响应效率取决于服务器的性能与稳定性,因此每个客户端需要一个容器来存 放由它发出的请求,正如BuildCommandRequest成员函数中的第一个参数是个容器说描述的那样;最后,在服务器端,所有的已经注册的命令处 理实体应该由一个容器存放,这个是最显然的。在这三个容器中前两个是动态的,并且是以客户端为中心设计的,而第三个是静态的,以服务器为中心设计的,客户 端请求的处理过程涉及到三次容器操作,首先遍历客户端容器,对于每一个客户端遍历它的请求容器,对于筛选出的每一个合法请求去匹配已经注册的请求处理实体 容器中每一个,事实上最后一次的匹配也是一次遍历,经过三次遍历和若干次的判断,最终会调用特定命令的特定的回调函数。
到此为止机制已经讲完了,现在来设想一个合理的流程,也就是设想一个使用策略。最简单的就是服务器在一个端口比如8000侦听,然后对于每一个连接过来的客户,初始化一个结构体:
struct CLIENTDATA
{
    PCON_ENTITY            pCon;
    std::vector      vtRecvData;    //请求容器
};
初 始化了以上结构体之后将该结构体插入客户端容器,直到检测到该客户端断开连接的时候再将该客户端对应的上述结构体从客户端容器中删除。插入客户端到容器之 后就可以遍历容器内部的所有客户端进行select操作了,一旦select返回说明监控的套接字上有事件发生,进而开始上述的三次遍历过程,最终调用 CDataProcessDesRegister对象的DoProcessHandle,那么CDataProcessDesRegister对象在何时 初始化,实际上我们将它定义为static即可,因为它明显就是一个工具类,也可以认为是一个帮助类,之所以定义成一个类是为了更好的管理,因为类有很多 很好的特定,比如封装,于是就有了下面的帮助函数:
CDataProcessDesRegister* GetDataProcessDesRegister()
{
    static CDataProcessDesRegister objDataProcDesRegister;
    return &objDataProcDesRegister;
}
现在看一个真实的例子,注意为了节省篇幅省略了错误处理,就是说默认结果都是正确的,并且有的地方用了伪代码:
DWORD WINAPI ListenEngineerProc( LPVOID lpParam )
{
    std::vector * p_vtClients;
...
    FD_SET fsActive, fsRead;
    FD_ZERO (&fsActive);
    FD_SET (g_soServer, &fsActive);
    while ( TRUE )
    {
        fsRead = fsActive;
        int nSelectRes = select (0, &fsRead, NULL, NULL, NULL);
        if (FD_ISSET(g_soServer, &fsRead))
        {
            CClientDataProcessing::CLIENTDATA objClientData;
            size_t szAddress = sizeof(objClientData.saddrClient);
            objClientData.soClient = accept(g_soServer, (sockaddr*)&objClientData.saddrClient, (int*)&szAddress);
            ...//初始化一个新的CLIENTDATA,即objClientData,其实就是初始化其pCon字段并且将之插入到客户端容器
        }
        else
        {
            std::vector ::iterator end_k = p_vtClients->end() ;
            std::vector ::iterator k = p_vtClients->begin();
            while (k!=end_k)
            {
                if (!FD_ISSET(k->soClient, &fsRead))
                {
                    ++k ;
                    continue;
                }//以下的调用将从k->pCon读到的数据插入其vtRecvData容器
                int nRes = CClientDataProcessing::ReadDataFromClient(k->pCon, &k->vtRecvData);
                while (true)
                {
                    std::vector vtCommandData;
                    int nCommandLength = GetDataProcessDesRegister()->BuildCommandRequest(&k->vtRecvData, &vtCommandData);
                    else if (nCommandLength>0)
                    {//解析命令正确,处理之,上述删除了部分代码,如果解析命令出错,要跳出本次循环并且回复客户
                        k->vtRecvData.erase(k->vtRecvData.begin(), k->vtRecvData.begin()+nCommandLength);
                        CClientDataProcessing::ProcessClientData(k->ssl, &vtCommandData,&k->saddrClient);
                    }
                }//对于一个客户端读取数据
                ++k ;
            }//循环客户端
        }
    }
    return 2008;
}
看看本文的标题,消息映射服务器,到此为止丝毫没有提及消息映射,其实把命令理解成消息就可以了,消息怎么个映射法呢?现在就看看吧:
#define REGISTER_COMMAND_WITH_DATATYPE(dwCmd, dwDataType, dwDataCellLength, pProcHandle) /
int nRegRes_##dwCmd##dwDataType = GetDataProcessDesRegister()->Register(dwCmd, dwDataType, dwDataCellLength, pProcHandle);
DATAPROCESSHANDLE就是回调函数的原型,在注册的时候,只需要定义一个上述类型的函数,然后映射一个命令就可以了,比如注册心跳命令处理实体:
REGISTER_COMMAND_WITH_DATATYPE(L_CMD_CON_CHECK, L_REQTYPE_NULL, sizeof(char), LyCMDNetStatus::OnGetNetStatus);
注册完之后效果就是,一旦服务器接收到心跳命令,那么就会调用LyCMDNetStatus::OnGetNetStatus函数,这就是消息映射,是不是和MFC的消息映射有点像!
现在看看如何将这个消息映射框架和多线程和多进程模型结合起来。其实上面那个完整的例子十分好,只是还是不是那么完美,可能会引起延迟,当然完全可以在 accept之后马上开启一个线程或者进程去单独处理一个客户端,但是那样的方式开销有点大,比如一个很简单的请求为它单开一个线程就没有意义了,再说即 使客户端一直保持连接,客户端的处理完全交给线程去做,流程很难得到很好的控制,因此还是上面的模型比较好,那么它的缺点在哪里呢?试想一个客户端有一个 很紧急的必须在规定时间内响应的请求,上述模型其实是一个端口开启一个那样的线程,在线程内部采用遍历客户端,然后在客户端遍历接收到的数据的过程,就是 一个串行处理,别的连接服务器相同端口的客户端就可能影响到这个需要紧急处理的客户端的请求,于是将上述的消息映射做一下改进:
struct DATAPROCESSDES_v2
{    
    char byCommand;            //命令
    char byDataType;        //类型
    unsigned int dwDataCellLength;
    char byThread;            //是否开启一个线程处理,对于耗时命令处理为了不影响别的命令执行,需要线程化
    DATAPROCESSHANDLE pProcHandle;    //命令处理回调函数
};
接下来改进CDataProcessDesRegister的DoProcessHandle成员函数:
int CDataProcessDesRegister::DoProcessHandle(PCON_ENTITY pCon, L_REQUEST* pRequest, const char* pData, unsigned int dwDataLen )
{
...
            if( !m_vtDataTypeDes[i].byThread )
                return m_vtDataTypeDes[i].pProcHandle(pCon, pRequest, pData, dwDataLen);
            else
            {
                //开启一个线程,将pCon, pRequest, pData, dwDataLen传入
                return 0;
            }
...
}
或 者在CClientDataProcessing::ProcessClientData外面首先搜集客户请求的命令,然后将需要线程化的命令放到一个单 独的容器中,比如红黑树中,不需要单开线程的命令放到另外一个容器中,最后对于需要线程化的命令容器中的命令,一个命令开一个线程,对于不需要线程化的命 令容器中的命令则直接串行执行。其实完全可以将线程化处理的任务交给回调函数自己,比如以下:
REGISTER_COMMAND_WITH_DATATYPE(L_CMD_TEST, L_REQTYPE_NULL, sizeof(char), OnThreadTest);
int OnThreadTest(PCON_ENTITY pCon, L_REQUEST* pRequest, const char* lpszData, unsigned int dwDataLen)
{
...//开启一个线程进行处理
...//直接返回
}
但 是这样的话,回调函数的负担就重了,再者说了,这个工作到底是机制还是策略本身就是一个问题,如果它是一个控制流程需要操心的事情,那么就不应该交给回调 函数,反之交给回调函数也没有什么不可以,顺便说一句,linux内核对于hrtimer也是这么处理的,不过它是将所有的可以线程化的hrtimer放 到了一个软中断中而不是每一个hrtimer一个内核线程,其实完全可以用工作队列实现,不过工作队列还只能是所有的timer公用一个,因为 hrtimer的类型以及创建对于内核hrtimer机制来讲是未知的,hrtimer的创建完全自主化而不像本文的服务器处理的命令,先注册若干确定的 命令处理实体,然后客户端来了数据以后去和所有的处理实体匹配:
void hrtimer_run_queues(void)
{
         struct rb_node *node;
         struct hrtimer_cpu_base *cpu_base = &__get_cpu_var(hrtimer_bases);
         struct hrtimer_clock_base *base;
         int index, gettime = 1;
         if (hrtimer_hres_active())
                 return;
         for (index = 0; index < HRTIMER_MAX_CLOCK_BASES; index++) {
                 base = &cpu_base->clock_base[index];
...//非常规处理中会调用hrtimer_get_softirq_time一次来获取当前的时间
                 spin_lock(&cpu_base->lock);
                 while ((node = base->first)) {
                         struct hrtimer *timer;
                         timer = rb_entry(node, struct hrtimer, node);
                         if (base->softirq_time.tv64 <= hrtimer_get_expires_tv64(timer))
                                 break; //凡是到期时间大于当前时间的hrtimer一律不予处理
                         if (timer->cb_mode == HRTIMER_CB_SOFTIRQ) {
                                 __remove_hrtimer(timer, base, HRTIMER_STATE_PENDING, 0);
                                 list_add_tail(&timer->cb_entry, &base->cpu_base->cb_pending);
                                 continue; //以上将可以延迟的hrtimer搜集起来
                         }
                         __run_hrtimer(timer); //不可延迟的hrtimer直接执行
                 }
                 spin_unlock(&cpu_base->lock);
         }
}
这 里的hrtimer的处理流程和我们服务器处理命令的流程是不一样的,对于hrtimer来讲,它只能用一个内核线程来处理而不便于每一个hrtimer 一个内核线程,原因有二,第一,内核的hrtimer机制对用户创建的hrtimer一无所知,就知道它是一个hrtimer,因此线程内部的处理流程不 好控制;第二,这是在内核中,不能耗费巨资去创建内核线程,再者说hrtimer不像TCP那样长久连接,hrtimer几乎都是瞬间执行完的,即便延迟 也不会太久。而我们的服务器流程十分明确,就是服务客户端,并且服务类型就是那几种注册地命令,最重要的,客户端可能和服务器是长连接,常规的做法就是单 开一个线程处理这个长连接,但是这个消息映射的做法却是单开一个线程处理一个可能会延时的命令,因为这个服务器是串行轮询各个客户端的各个请求的,因此这 种方式很有意义,可以保证一个延迟的请求不会影响到其它的用户的其它请求。
这个服务器算是设计完成了,其实它就是我们公司目前正在做的产品的服务器主架构,很菜,问题很多,本来我都懒得看哪怕一行代码,但是后来感觉代码并不是想象中那么不好,于是就写下了此文。


原文链接:http://blog.csdn.net/dog250/article/details/5303459
加载中
返回顶部
顶部