400行python教你写个高性能http服务器+web框架,性能秒胜tornado uwsgi webpy django

feimat 发布于 2014/05/09 14:00
阅读 7K+
收藏 39

echo hello  性能压测abtest 

tornado 4kqps 
nginx+tornado 9kqps
nginx+uwsgi 8kqps
 (注意:没说比nginx快,只是这几个web框架不行)

本server 3.2w qps 
没用任何python加速

不相信的可以自己压测下哦


什么都不说400行代码加使用例子,欢迎吐槽,欢迎加好友一起进步

server.py

#!/usr/bin/python
#-*- coding:utf-8 -*-
# Copyright (c) 2012, Baidu.com Inc.
#
# Author      : hemingzhe <512284622@qq.com>; xiaorixin <xiaorx@live.com>
# Date        : Dec 20, 2012
#

import socket, logging
import select, errno
import os
import sys
import traceback
import Queue
import threading
import time
import thread
import cgi
from cgi import parse_qs
import json
import imp
from sendfile import sendfile
from os.path import join, getsize
import md5
import gzip
from StringIO import StringIO
from BaseHTTPServer import BaseHTTPRequestHandler
import re

logger = logging.getLogger("network-server")
action_dic = {}
action_time = {}
static_file_dir = "static"
static_dir = "/%s/" % static_file_dir
cache_static_dir = "cache_%s" % static_file_dir
if not os.path.exists(cache_static_dir):
    os.makedirs(cache_static_dir)
filedic = {"HTM":None,"HTML":None,"CSS":None,"JS":None,"TXT":None}

def getTraceStackMsg():
    tb = sys.exc_info()[2]
    msg = ''
    for i in traceback.format_tb(tb):
        msg += i
    return msg

def md5sum(fobj):
    m = md5.new()
    while True:
        d = fobj.read(65536)
        if not d:
            break
        m.update(d)
    return m.hexdigest()

class QuickHTTPRequest():
    def __init__(self, data):
        headend = data.find("\r\n\r\n")
        rfile = ""
        if headend > 0:
            rfile = data[headend+4:]
            headlist = data[0:headend].split("\r\n")
        else:
            headlist = data.split("\r\n")
        self.rfile = StringIO(rfile)
        first_line = headlist.pop(0)
        self.command, self.path, self.http_version =  re.split('\s+', first_line)
        indexlist = self.path.split('?')
        indexlist = indexlist[0].split('/')
        while len(indexlist) != 0:
            self.index = indexlist.pop()
            if self.index == "":
                continue
            else:
                self.action,self.method = os.path.splitext(self.index)
                self.method = self.method.replace('.', '')
                break
        self.headers = {}
        for item in headlist:
            if item.strip() == "":
                continue
            segindex = item.find(":")
            if segindex < 0:
                continue
            key = item[0:segindex].strip()
            value = item[segindex+1:].strip()
            self.headers[key] = value
        c_low = self.command.lower()
        self.getdic = {}
        self.form = {}
        self.postdic = {}
        if c_low  == "get" and "?" in self.path:
            self.getdic = parse_qs(self.path.split("?").pop())
        elif c_low == "post" and self.headers.get('Content-Type',"").find("boundary") > 0:
            self.form = cgi.FieldStorage(fp=self.rfile,headers=None,
                    environ={'REQUEST_METHOD':self.command,'CONTENT_TYPE':self.headers['Content-Type'],})
            if self.form == None:
                self.form = {}
        elif c_low == "post":
            self.postdic = parse_qs(rfile)

def sendfilejob(request, data, epoll_fd, fd):
    #print request.path,"3:",time.time()
    try:

        base_filename = request.path[request.path.find(static_dir)+1:]
        cache_filename = "./cache_"+base_filename
        filename = "./"+base_filename
        if not os.path.exists(filename):
            raise Exception("file not found")
        name,ext = os.path.splitext(filename)
        ext = ext.replace('.', '')
        iszip = False
        etag = request.headers.get("If-Modified-Since", None)
        #filemd5 = md5sum(file(filename))
        filemd5 = str(os.path.getmtime(filename))
        if ext.upper() in filedic:
            if not os.path.exists(cache_filename) or (etag != None and etag != filemd5):
                d,f = os.path.split(cache_filename)
                try:
                    if not os.path.exists(d):
                        os.makedirs(d)
                    f_out = gzip.open(cache_filename, 'wb')
                    f_out.write(open(filename).read())
                    f_out.close()
                except Exception, e:
                    print str(e)
                    pass
            filename = cache_filename
            iszip = True



        sock = data["connections"]
        #sock.setblocking(1)
        if etag == filemd5:
            #sock.send("HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\n\r\n" % filemd5)
            data["writedata"] = "HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\n\r\n" % filemd5
        else:
            data["sendfile"] = True
            sock.setblocking(1)
            offset = 0
            filesize = os.path.getsize(filename)
            f = open(filename, "rb")
            if iszip:
                headstr = "HTTP/1.0 200 OK\r\nContent-Length: %s\r\nLast-Modified: %s\r\nContent-Encoding: gzip\r\n\r\n" % (filesize,filemd5)
            else:
                headstr = "HTTP/1.0 200 OK\r\nContent-Length: %s\r\nLast-Modified: %s\r\n\r\n" % (filesize,filemd5)
            sock.send(headstr)
            while True:
                sent = sendfile(sock.fileno(), f.fileno(), offset, 65536)
                if sent == 0:
                    break  # EOF
                offset += sent
            f.close()
            data["writedata"] = ""
            data["sendfile"] = False
        #print request.path,"4:",time.time()
    except Exception, e:
        data["sendfile"] = False
        #data["writedata"] = str(e)+getTraceStackMsg()
        data["writedata"] = "file not found"
        pass
    try:
        data["readdata"] = ""
        epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)
    except Exception, e:
        print str(e)+getTraceStackMsg()

class Worker(object):

    def __init__(self):
        pass

    def process(self, data, epoll_fd, fd):
        res = ""
        add_head = ""
        try:
            request = QuickHTTPRequest(data["readdata"])
        except Exception, e:
            #return "http parser error:"+str(e)+getTraceStackMsg()
            res = "http format error"
        try:
            headers = {}
            headers["Content-Type"] = "text/html;charset=utf-8"
            if request.path == "/favicon.ico":
                request.path = "/"+static_file_dir+request.path
            if static_dir in request.path or "favicon.ico" in request.path:
                thread.start_new_thread(sendfilejob, (request,data,epoll_fd,fd))
                #sendfilejob(request,data,epoll_fd,fd)
                return None
            action = action_dic.get(request.action, None)
            if action == None:
                action = __import__(request.action)
                mtime = os.path.getmtime("./%s.py" % request.action)
                action_time[request.action] = mtime
                action_dic[request.action] = action
            else:
                load_time = action_time[request.action]
                mtime = os.path.getmtime("./%s.py" % request.action)
                if mtime>load_time:
                    action = reload(sys.modules[request.action])
                    action_time[request.action] = mtime
                    action_dic[request.action] = action

            method = getattr(action, request.method)
            res = method(request, headers)
            if headers.get("Connection","") != "close":
                data["keepalive"] = True
            res_len = len(res)
            headers["Content-Length"] = res_len
            for key in headers:
                add_head += "%s: %s\r\n" % (key, headers[key])
        except Exception, e:
            #res = str(e)+getTraceStackMsg()
            res = "page no found"
        try:
            data["writedata"] = "HTTP/1.1 200 OK\r\n%s\r\n%s" % (add_head, res)
            data["readdata"] = ""
            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)
        except Exception, e:
            print str(e)+getTraceStackMsg()

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)

class MyThread(threading.Thread):
    ind = 0
    def __init__(self, threadCondition, shareObject, **kwargs):
        threading.Thread.__init__(self, kwargs=kwargs)
        self.threadCondition = threadCondition
        self.shareObject = shareObject
        self.setDaemon(True)
        self.worker = Worker()

    def processer(self, args, kwargs):
        try:
            param = args[0]
            epoll_fd = args[1]
            fd = args[2]
            self.worker.process(param, epoll_fd, fd)
        except:
            print  "job error:" + getTraceStackMsg()

    def run(self):
        while True:
            try:
                args, kwargs = self.shareObject.get()
                self.processer(args, kwargs)
            except Queue.Empty:
                continue
            except :
                print "thread error:" + getTraceStackMsg()

class ThreadPool:
    def __init__( self, num_of_threads=10):
        self.threadCondition=threading.Condition()
        self.shareObject=Queue.Queue()
        self.threads = []
        self.__createThreadPool( num_of_threads )

    def __createThreadPool( self, num_of_threads ):
        for i in range( num_of_threads ):
            thread = MyThread( self.threadCondition, self.shareObject)
            self.threads.append(thread)

    def start(self):
        for thread in self.threads:
            thread.start()

    def add_job( self, *args, **kwargs ):
        self.shareObject.put( (args,kwargs) )

def run_main(listen_fd):
    try:
        epoll_fd = select.epoll()
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)
    except select.error, msg:
        logger.error(msg)

    tp = ThreadPool(16)
    tp.start()

    params = {}
    def clearfd(fd):
        epoll_fd.unregister(fd)
        params[fd]["connections"].close()
        del params[fd]

    last_min_time = -1
    while True:
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            cur_time = time.time()
            if fd == listen_fd.fileno():

                while True:
                    try:
                        conn, addr = listen_fd.accept()
                        #print "accept",time.time(),conn.fileno()
                        conn.setblocking(0)
                        epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)
                        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                        #conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
                        params[conn.fileno()] = {"addr":addr,"writelen":0, "connections":conn, "time":cur_time}
                    except socket.error, msg:
                        break
            elif select.EPOLLIN & events:
                #print "read",time.time()
                param = params[fd]
                param["time"] = cur_time
                datas = param.get("readdata","")
                cur_sock = params[fd]["connections"]
                while True:
                    try:
                        data = cur_sock.recv(102400)
                        if not data:
                            clearfd(fd)
                            break
                        else:
                            datas += data
                    except socket.error, msg:
                        if msg.errno == errno.EAGAIN:
                            #logger.debug("%s receive %s" % (fd, datas))
                            param["readdata"] = datas
                            len_s = -1
                            len_e = -1
                            contentlen = -1
                            headlen = -1
                            len_s = datas.find("Content-Length:")
                            if len_s > 0:
                                len_e = datas.find("\r\n", len_s)
                            if len_s > 0 and len_e > 0 and len_e > len_s+15:
                                len_str = datas[len_s+15:len_e].strip()
                                if len_str.isdigit():
                                    contentlen = int(datas[len_s+15:len_e].strip())
                            headend = datas.find("\r\n\r\n")
                            if headend > 0:
                                headlen = headend + 4
                            data_len = len(datas)
                            if (contentlen > 0 and headlen > 0 and (contentlen + headlen) == data_len) or \
                                    (contentlen == -1 and headlen == data_len):
                                tp.add_job(param,epoll_fd,fd)
                                #print "1:",time.time()
                                #thread.start_new_thread(process, (param,epoll_fd,fd))
                            break
                        else:
                            clearfd(fd)
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events or select.EPOLLERR & events:
                clearfd(fd)
                logger.error("sock: %s error" % fd)
            elif select.EPOLLOUT & events:
                #print "write",time.time()
                param = params[fd]
                param["time"] = cur_time
                sendLen = param.get("writelen",0)
                writedata = param.get("writedata", "")
                total_write_len = len(writedata)
                cur_sock = params[fd]["connections"]
                if writedata == "":
                    clearfd(fd)
                    continue
                while True:
                    try:
                        sendLen += cur_sock.send(writedata[sendLen:])
                        if sendLen == total_write_len:
                            if param.get("keepalive", True):
                                param["readdata"] = ""
                                param["writedata"] = ""
                                param["writelen"] = 0
                                epoll_fd.modify(fd, select.EPOLLET | select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
                            else:
                                clearfd(fd)
                            break
                    except socket.error, msg:
                        if msg.errno == errno.EAGAIN:
                            param["writelen"] = sendLen
                            break
            else:
                continue
            #check time out
            if cur_time - last_min_time > 10:
                last_min_time = cur_time
                objs = params.items()
                for (key_fd,value) in objs:
                    fd_time = value.get("time", 0)
                    del_time = cur_time - fd_time
                    if del_time > 10:
                        sendfile = value.get("sendfile", False)
                        if sendfile != True:
                            clearfd(key_fd)
                    elif fd_time < last_min_time:
                        last_min_time = fd_time

if __name__ == "__main__":
    InitLog()
    port = int(sys.argv[1])
    try:
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error, msg:
        logger.error("create socket failed")
    try:
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error, msg:
        logger.error("setsocketopt SO_REUSEADDR failed")
    try:
        listen_fd.bind(('', port))
    except socket.error, msg:
        logger.error("bind failed")
    try:
        listen_fd.listen(1024)
        listen_fd.setblocking(0)
    except socket.error, msg:
        logger.error(msg)

    child_num = 8
    c = 0
    while c < child_num:
        c = c + 1
        newpid = os.fork()
        if newpid == 0:
            run_main(listen_fd)
    run_main(listen_fd)




用户文档:


1、启动:

   指定监听端口即可启动

   python server.py 8992


2、快速编写cgi,支持运行时修改,无需重启server


   在PySvr.py同一目录下

   随便建一个python 文件

   例如:

   example.py

   定义一个tt函数:

   则请求该函数的url为 http://ip:port/example.tt

   修改后保存,即可访问,无需重启


   example.py

   def tt(request,response_head):

       #print request.form

       #print request.getdic

       #print request.postdic

       return "ccb"+request.path


   函数必须带两个参数

   request:表示请求的数据 默认带以下属性

      headers: 头部 (字典)

      form:  multipart/form表单 (字典)

      getdic: url参数 (字典)

      postdic: httpbody参数 (字典)

      rfile: 原始http content内容  (字符串)

      action: python文件名 (这里为example)

      method: 函数方法    (这里为tt)

      command:  (get or post)

      path: url (字符串)

      http_version: http版本号 (http 1.1)

   response_head: 表示response内容的头部

      例如如果要返回用gzip压缩

      则增加头部

      response_head["Content-Encoding"] = "gzip"


3、下载文件

   默认静态文件放在static文件夹下

   例如把a.jpg放到static文件夹下

   访问的url为 http://ip:port/static/a.jpg

   支持etag 客户端缓存功能

   支持range 支持断点续传

   (server 使用sendfile进行文件发送,不占内存且快速)


4、支持网页模板编写

   创建一个模板 template.html

   <HTML>

       <HEAD><TITLE>$title</TITLE></HEAD>

       <BODY>

           $contents

       </BODY>

   </HTML>


   则对应的函数:

   def template(request,response_head):

       t = Template(file="template.html")

       t.title  = "my title"

       t.contents  = "my contents"

       return str(t)

   模板实现使用了python最快速Cheetah开源模板,

   性能约为webpy django thinkphp等模板的10倍以上:

   http://my.oschina.net/whp/blog/112296


example.py


import os

import imp

import sys

import time

import gzip

from StringIO import StringIO

from Cheetah.Template import Template


def tt(request,response_head):

    #print request.form

    #print request.getdic

    #print request.postdic

    return "ccb"+request.path


def getdata(request,response_head):

    f=open("a.txt")

    content = f.read()

    f.close()

    response_head["Content-Encoding"] = "gzip"

    return content


def template(request,response_head):

    t = Template(file="template.html")

    t.title  = "my title"

    t.contents  = "my contents"

    response_head["Content-Encoding"] = "gzip"

    return str(t)



安装 如果需要使用web模板功能这里需要安装Cheetah

附件中有安装脚本


加载中
0
荆棘谷-部落-我要么
荆棘谷-部落-我要么

很牛的样子。。。不过,还还是最爱 erlang

0
mallon
mallon

好歹排个版吧

0
litescript
litescript

python -m SimpleHTTPServer



0
MeiKai
MeiKai

我花了1500行

MeiKai
MeiKai
回复 @feimat : 异步socketserver+框架的代码没有保留,不过现在还有uwsgi的框架代码,不过都不带模板引擎的
feimat
feimat
可以分享下你的代码吗
0
WeirdBIrd
WeirdBIrd

放git  上吧 

0
Zoker
Zoker

放git@osc  上吧    这看着有点蛋疼。

0
月影又无痕
月影又无痕

当作学习倒是可以,但是如果用于实际使用,恐怕就难以担当重任了。

0
0
猎户座
猎户座

来个输入校验。。输出校验试试

feimat
feimat
好建议
返回顶部
顶部