UtilBox(ub)基础组件 -- epoll_server网络事件模型

长平狐 发布于 2013/01/05 18:17
阅读 209
收藏 0


        今天事情比较少,突然在在网上看到了一篇关于网络同步异步IO的帖子,正好想起了前几天分析过的Redis的代码。Redis的代码很精练也很轻巧,基本没有第三方以来的库(最新版本中加入了jemalloc,但已集成在了redis的src里,所以依旧可以直接make),并实现一套轻量型的非阻塞半异步框架-aeEvent(很少有大型互联网后台应用采用全异步框架,一是逻辑和IO都是异步的这样即时性较低,二是编程难度较高)!

        完整的分析过了redis的aeEvent,也对比了一下memcached用的libevent。感觉却是比libevent轻量些,两者在linux下都是用epoll实现(定时器的实现细节略有不同,libevent是用最小堆管理,aeEvent是链表,后期redis可能改进)。大体思想基本类似,由于redis是单线程模型,据作者说是考虑到线程锁的问题。两者暂时没有做过效率对比,之后有机会可以跑些数据。

        看了redis之后,自己也做了一个微型的基于epoll的event_server模型,可以应用在一些自己环境之中,同样采用单线程infinite_loop的方式,通过epoll_ctl注册和删除需要关注的file descript (fd) ,然后通过epoll_wait来循环等待IO事件,触发记录在该fd上的write或者read回调函数(通过附加结构体实现)。

/*
 * =====================================================================================
 *
 *       Filename:  epoll_server.c
 *
 *    Description:  A example for Linux epoll
 *
 *        Version:  1.0
 *        Created:  03/28/2012 03:40:37 PM
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Michael LiuXin, 
 *   Organization:  
 *
 * =====================================================================================
 */
#include <sys/socket.h>   
#include <sys/epoll.h>   
#include <netinet/in.h>   
#include <arpa/inet.h>   
#include <fcntl.h>   
#include <unistd.h>   
#include <stdio.h>   
#include <errno.h>   
#include <stdlib.h>
#include <assert.h>
#include <netinet/tcp.h>

#include <string.h>
#include <strings.h>

#define MAX_EVENTS 500   
#define EPOLL_WAIT_TIMEOUT 1000

#define EVENT_READABLE 1
#define EVENT_WRITEABLE 2

struct event_server;

// 回调在fd上的write和read的callback函数指针
typedef void (*event_handler)(struct event_server*,int);

// 每个fd一个的event结构
struct event_t
{
	unsigned char mask;	// 标志WRITE和READ的掩码
	event_handler read;  
	event_handler write;
	void* data;
};

/**
 * Event server structure , maintain a core Events
 */
struct event_server
{
	int epfd;		// epoll_create的fd
	int is_blocking;		// 设置非阻塞
	struct epoll_event events[MAX_EVENTS];		// 数组实现某个fd的结构的索引,也可以用Hash
	struct event_t events_set[MAX_EVENTS];
	unsigned long loops;
};
// 创建一个event_server
struct event_server* create_server() 
{
	struct event_server *server = (struct event_server*)malloc(sizeof(*server));
	memset(server,0,sizeof(*server));
	// just a hint for kernel
	server->epfd = epoll_create(1024); 		
	if (-1 != server->epfd) {
		printf("ok=create_server\n");
		return server;	
	} else {
		printf("err=create_server\n");
		return NULL;
	}
}

// 反注册一个fd的event
int unregister_server_event(struct event_server* server, int fd, int type)
{
	struct epoll_event ev;
	ev.data.fd = fd;		
	ev.events = server->events_set[fd].mask;

	if (type & EVENT_WRITEABLE)
		ev.events &= ~EPOLLOUT ;
	if (type & EVENT_READABLE)
		ev.events &= ~EPOLLIN;		

	// if there is no event then delelte , otherwise modify
	int op = ev.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL ;
	// to listen the fd
	if (-1 == epoll_ctl(server->epfd,op,fd,&ev)) {
		printf("err=epoll_ctl_del\n");
		return -1;
	} else {
		// record the read/write callback-function 
		// use it at epoll_wait call
		server->events_set[fd].read = (type&EVENT_READABLE)?NULL:server->events_set[fd].read;
		server->events_set[fd].write = (type&EVENT_WRITEABLE)?NULL:server->events_set[fd].write;
		server->events_set[fd].mask = ev.events;
		//printf("ok=epoll_ctl_%s\n",op==EPOLL_CTL_MOD?"mod":"del");
		return 0;
	}
}

// 注册一个fd的event
int register_server_event(struct event_server* server, int fd, int type, event_handler fun)
{
	assert(fd);
	assert(type);
	assert(fun);

	struct epoll_event ev = {0};
	ev.data.fd = fd;		
	ev.events |= server->events_set[fd].mask;
	if (type & EVENT_WRITEABLE)
		ev.events |= EPOLLOUT ;
	if (type & EVENT_READABLE)
		ev.events |= EPOLLIN;		
	
	// to listen the fd MOD or ADD
	int op = server->events_set[fd].mask ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
	if (-1 == epoll_ctl(server->epfd,op,fd,&ev)) {
		printf("err=epoll_ctl_%s\n",server->events_set[fd].mask?"mod":"add");
		return -1;
	} else {
		// record the read/write callback-function 
		// use it at epoll_wait call
		//printf("ok=epoll_ctl_%s\n",server->events_set[fd].mask?"mod":"add");
		type&EVENT_READABLE ? server->events_set[fd].read=fun : 0;
		type&EVENT_WRITEABLE ? server->events_set[fd].write=fun : 0;
		server->events_set[fd].mask = ev.events;
		
		return 0;
	}
}

// 阻塞非阻塞,基本的网络模型为防止read阻塞都采用nonblocking
void set_server_nonblocking(struct event_server* server) 
{
	server->is_blocking = 0;
}

void set_server_blocking(struct event_server* server) 
{
	server->is_blocking = 1;
}

// infinite loop 事件循环,线程在此循环
void run(struct event_server* server) 
{
	// do an infinite loop for epoll_wait
	while(1) {
		//printf("before_epoll_wait\n");
		int n = epoll_wait(server->epfd,server->events,MAX_EVENTS,EPOLL_WAIT_TIMEOUT);
		//printf("after_epoll_wait\n");
		
		if (0 == n) {
			//printf("Waiting<%lu>...\n",server->loops++);
			continue;
		}

		//printf("for_each<%d>\n",n);
		for (int i=0; i!=n; i++) {
			assert(server->events[i].data.fd);
			assert(server->events_set[server->events[i].data.fd].mask);
			if ((server->events[i].events & EPOLLIN) && server->events_set[server->events[i].data.fd].read) {
				server->events_set[server->events[i].data.fd].read(server,server->events[i].data.fd);
				continue;
			} 
			if ((server->events[i].events & EPOLLOUT) && server->events_set[server->events[i].data.fd].write) {
				server->events_set[server->events[i].data.fd].write(server,server->events[i].data.fd);
				continue;
			}
		}
	}
}


// write事件回调函数
void tcp_write(struct event_server* server, int clientfd)
{
	int length = strlen("received");
	while(1) {
		int n = write(clientfd,"received",length);
		if (-1==n && errno==EAGAIN)
			break;
		length -= n;
		//printf("ok=write_client<%d>\n",n);
		if (length <= 0)
			break;
	}
	unregister_server_event(server,clientfd,EVENT_WRITEABLE);
}

// read事件回调函数
void tcp_read(struct event_server* server, int clientfd)
{
	/**
	 * this method is called by epoll_wait callback if there has 
	 * something to read in buffer
	 */
	char buf[1024] = {0};
	int ret = -1;
	while(1) {
		ret=read(clientfd,buf,1024);
		if (0 == ret) {
			unregister_server_event(server,clientfd,EVENT_READABLE|EVENT_WRITEABLE);
			close(clientfd);
			printf("ok=client_quit\n");
			break;
		}
		if (-1==ret && errno==EAGAIN)
			break;
		//printf("ok=read_from_%d<%d>:\"%s\"\n",clientfd,ret,buf);
		//register_server_event(server,clientfd,EVENT_WRITEABLE,tcp_write);
	}
}

// server的socket fd的回调函数。只负责accept并注册
void tcp_accept(struct event_server* server, int server_socket)
{
	// the server must be accepted
	int cfd = accept(server_socket,NULL,NULL);
	
	if (cfd) {
		
		int flag = fcntl(cfd,F_GETFL,0);
		// nonblocking
		flag |= O_NONBLOCK;
		if (-1 == fcntl(cfd,F_SETFL,flag))
			printf("err=set_nonblocking\n");
		// no delay (without nagle)
		int nodelay = 1;
		if (-1 == setsockopt(cfd,IPPROTO_TCP,TCP_NODELAY,&nodelay,sizeof(nodelay)))
			printf("err=set_tcp_no_delay\n");
		
		// add the client_fd to epoll loop
		register_server_event(server,cfd,EVENT_READABLE,tcp_read);
	} else {
		printf("err=accept_socket\n");
	}

}

// drive function
#define ut_main main
int ut_main() 
{
	// setup a socket
	int server_socket = socket(AF_INET,SOCK_STREAM,0);
	if (-1 == server_socket) {
		printf("err=create_socket\n");
		return -1;
	}
	else
		printf("ok=create_socket\n");
	struct sockaddr_in server_addr;
	bzero(&server_addr,sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = htons(INADDR_ANY);
	server_addr.sin_port = htons(9898);

	int flag=1,len=sizeof(flag);
	// we can reuse the port
	setsockopt(server_socket,SOL_SOCKET,SO_REUSEADDR,&flag,len);
	if (-1 == setsockopt(server_socket,IPPROTO_TCP,TCP_NODELAY,&flag,sizeof(flag)))
		printf("err=set_tcp_no_delay\n");

	// bind ip/port
	if (-1 == bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))) {
		printf("err=bind_socket\n");return -1; }
	else
		printf("ok=bind_socket\n");
	if (-1 == listen(server_socket,1024)) {
		printf("err=listen_socket\n");return -1; }
	else	
		printf("ok=listen_socket\n");

	// create a epoll server handle
	struct event_server* server = create_server();
	set_server_nonblocking(server);
	// firstly listen the server's socket with ACCEPT
	register_server_event(server,server_socket,EVENT_READABLE,tcp_accept);

	// do event loop 
	run(server);

	return 0;
}





        有些细节没特别关注,例如Linger之类的问题(readv/writev)暂时不care,后续继续完善。单线程下压力QPS可以打到5万(blocksize很小只是一个字符串,会对数据包的使用率造成和并发造成一定影响)。还没试过多线程(可以一个线程包一个server,或者由server来托管线程)。只是简单做了一个epoll多路复用的引子,基本“画出”了网络框架的影子,其实不管是redis、libevent还是apache、nginx都是以此位基点进行扩展,在上面做线程、并发控制、进程池(apache的prefork)等。




原文链接:http://blog.csdn.net/gugemichael/article/details/7506099
加载中
返回顶部
顶部