memcached源代码阅读笔记(二):网络处理部分

长平狐 发布于 2012/06/11 18:43
阅读 834
收藏 0

既然memcached是一个缓存服务器,而且要提供高效的缓存服务,那么网络层肯定要非常有效率才行。要能支撑大量的并发连接,还要有很优秀的响应速度。除此之外,因为memcached的核心业务并不是网络层,它的核心是缓存机制。那么就必须采用一种机制,将网络层隔离,以免网络通信部分缠绕在系统的各处,扰乱了核心逻辑。

在这一点上要感谢基于事件驱动的网络库libevent。memcached就是采用这个来作为它的网络层,所以对于memcache来说,即使有成千上万的连接处理起来也不是什么难事。

libevent的事件驱动机制除了能提高网络处理的效率外,还抽象了各个操作系统上最高效的方式:比如Linux上的epoll, FreeBSD上的kqueue 以及Windows的IOCP。就不说跨平台了,单单使用好其中一种机制就不是一件容易事。除此之外,使用事件驱动的机制,还能让你将网络处理部分与业务逻辑相分离。 

好,今天我们就来看看memcached是如何利用libevent构建其网络层的(注意,memcache同时支持TCP和UDP,不过本文只会关注TCP部分)。

为了更好的理解libevent,我们先来看看linux上如何使用epoll实现基于事件的网络编程: 

#define MAX_EVENTS 10
main(){
         int sfd;
         int flags;
         int efd;
         int nfds;
         int i;
         struct epoll_event ev, events[MAX_EVENTS];
         struct addrinfo *ai;
         struct addrinfo hints = {.ai_flags = AI_PASSIVE,
                                .ai_family = AF_UNSPEC};
        hints.ai_socktype = SOCK_STREAM;
         char port_buf[NI_MAXSERV];
        snprintf(port_buf,  sizeof(port_buf),  " %d ", port);
        getaddrinfo(NULL, port_buf, &hints, &ai);
         // create a socket to linsten
        sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
        flags = fcntl(sfd, F_GETFL,  0);
         // set nob block
        fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
         // bind
        bind(sfd, ai->ai_addr, ai->ai_addrlen);
         // listen
        listen(sfd,  15);
         // create epoll
        efd = epoll_create( 10);
        ev.events = EPOLLIN;
        ev.data.fd = sfd;
         // setup epoll
        epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ev);
         while( true){
                 // wait, block until a new connection is comming
                nfds = epoll_wait(efd, events, MAX_EVENTS, - 1);
                 for(i =  0; i < nfds; ++i){
                   dispatch(events[i]);
                }
        }

我在上面的代码里加了少量的注释。代码的前半部分没有什么,和所有的网络编程一样,就是创建套接字,然后bind到地址,开始监听。注意,这里我们将这个文件描述符设置为NON_BLOCK的,这一点很关键。然后我们创建epoll,设置感兴趣的事件,然后在一个循环里wait事件的发生。 wait会阻塞,当一个事件发生时会继续运行,然后根据事件的类型作出不同的处理。

可能有人要问,我没有发现任何事件触发的意思啊,这跟.NET里我们熟悉的事件处理差别太大了:

btnOK.OnClick +=  delegate(sender, e){
    // ...

不过如果你熟悉一点Win32编程的话就不那么认为了。不熟悉的话肯定听说过消息循环吧。想想看,上面的代码是不是跟Win32里的消息循环非常类似。一个loop,然后接受事件,翻译事件,然后根据事件的类型分发给事件处理句柄。只不过在Win32里编写窗体程序时,事件的来源是鼠标点击按钮或者键盘操作,而这里的事件来源是网络:可能是一个新的连接到来,也可能是读缓冲区里数据已经准备好(要了解Win32消息循环的更多细节可以参看我这篇文章:点击这里)。至于为何基于事件的这种机制对于大并发的系统很有用不在本文的范围内,如果要阐述清楚这个可能需要介绍以下IO模型等问题,不过你也可以通过阅读我之前写的并行和异步了解一些概念。

不过要让应用开发人员都亲力亲为的处理消息循环来做事件驱动的编程太过于麻烦了,不管是后来的MFC,还是VB抑或是WinForm中,那个消息循环再也没出现过。取而代之的是现在好用的事件。网络编程也一样,要亲力亲为的处理这种细节,太麻烦了,而且太容易出错。最麻烦的还有各个平台提供的机制还不一样,如是类似libevent这样的类库就应运而生。 

现在我们来看看memcache如何使用libevent进行高效的网络编程:

//event_base 是libevent的核心数据结构

staticstruct event_base *main_base;
main_base = event_init();
//然后就是创建scoket等
fd = socket(...)
bind()
listen()
//调用conn_new方法
conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,
                                             transport, main_base);

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    c->sfd = sfd;
    c->state = init_state;
    //设置感兴趣的事件,事件句柄是event_handler
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    event_add(&c->event0);
}
void event_handler(const int fd, const short which, void *arg) {
    drive_machine(c);
}
static void drive_machine(conn *c) {
 while (!stop) {
        switch(c->state) {
        case conn_listening:
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
            flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
                fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
           dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);

}

为了代码更清晰,我删除了错误处理等代码。上面的代码从构建libevent的核心数据结构event_base开始,然后创建socket,给该socket设置感兴趣的事件,并且设置了事件处理句柄event_handler,然后我们根据conn数据结构的state字段判断现在是什么事件并作出相应的处理(conn数据结构是memcache里处理网络连接的核心数据结构之一)。第一步当然是一个新的连接的到来:连接到了memcache服务器,这是第一个事件。然后我们通过dispatch_conn_new函数分发这个新到来的连接。在连接到来之后我们就要进入下一步处理了:比如从缓冲区读取数据,在这里就是接收memcache客户端发过来的各种命令,或者向缓冲区写出数据,比如我们接到一个get命令,然后我们就从缓存里读取相应的key对应的值,然后将该值写到缓冲区(关于memcache协议(命令)处理的内容后面的文章会有介绍,本文我们只关注网络处理部分)。

不知道刚才你阅读过我那篇Win32消息循环的文章没,在窗体编程中,为了在做耗时操作时不让界面假死我们常用的做法就是创建一个有别于消息循环的主线程,然后在这个线程里处理这些耗时操作。这里也一样,当连接到来后,我们要读命令,解析命令,处理命令。为了不让这些操作阻塞了主线程,即监听连接到来的线程(想想如果阻塞了结果会是怎样?),memcache也是类似的处理方法:主线程接收新连接,然后创建一些线程(可配置的)来处理命令。好,我们还是来看代码吧:

thread_init(settings.num_threads, main_base);
void thread_init( int nthreads,  struct event_base *main_base) {
    threads = calloc(nthreads,  sizeof(LIBEVENT_THREAD));

    dispatcher_thread. base = main_base;
    dispatcher_thread.thread_id = pthread_self();
    //这里很巧妙
     for (i =  0; i < nthreads; i++) {

        //创建一个管道,管道对应两个描述符,一个用于写,一个用于读。 

        int fds[2];
        pipe(fds);

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
        stats.reserved_fds += 5;
    }

    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
   //只有所有的线程都初始化完毕后,这里才会继续执行,否则阻塞
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);
    }
}
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();

    //在这里,我们在管道的读端注册感兴趣的事件 

    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);
    event_add(&me->notify_event, 0);
}
//事件处理函数,这里又调用了conn_new函数
static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
        CQ_ITEM *item;

        //从队列里取出要处理的连接

        item = cq_pop(me->new_conn_queue);
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
      
}
//创建worker线程用来处理命令等
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    ret = pthread_create(&thread, &attr, func, arg);

 上面的代码会在memcache初始化的时候就会执行,会创建一堆线程等待事件处理。那么是用什么方式将事件传递给这些线程呢,这里实现有点巧妙。注意到这几行代码:

pipe(fds); 

threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];

这里创建了一个管道(pipe),管道对应两个描述符,一个用于写,一个读。再来看看前面一段代码里我们没有贴出的dispatch_conn_new函数:

void  dispatch_conn_new( int  sfd,  enum  conn_states init_state,  int  event_flags, int read_buffer_size,  enum network_transport transport) {

    CQ_ITEM *item = cqi_new();
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    //将连接放到一个队列里
    cq_push(thread->new_conn_queue, item); 

    //向管道的写端写一个字节的数字,人为的触发事件 

    write(thread->notify_send_fd, ""1);

有意思的代码就在最后一行,在接收到一个连接之后,从刚才创建的一些LIBEVENT_THREAD里,选一个。这个结构里就有刚才创建的那个管道对应的两个描述符,然后往管道的写端写如一个字节,因为上面的setup_thread里为管道的读端设置了感兴趣的事件,这个时候事件就触发了(人为的触发一个事件,并且将刚才的连接放到一个队列里(这里是每个线程一个队列),这样就将一个同步的事件转换为异步的了)。事件触发后,thread_libevent_process函数就会执行。 然后又进入到conn_new函数,进入到drive_machine函数,又根据conn的state进行事件处理。

PS:我不知道这种利用管道,然后人为的触发事件的机制是不是一种什么模式或惯用法。除了在memcache这里我见到了这种方式外,在java的NIO里也有类似的使用。在java NIO里创建一个Selector后,会创建一个管道(在Linux上,而在windows上会创建一对socket),我们可以通过向管道的写端写入一个字节来唤醒已经阻塞的selector。

在介绍完memcache的网络处理部分后,下一篇我们就可以看看memcache是如何从网络上读取内容,解析命令的。 


原文链接:http://www.cnblogs.com/yuyijq/archive/2012/01/02/2291025.html
加载中
返回顶部
顶部