0
回答
Python队列及在微信机器人中的应用
利用AWS快速构建适用于生产的无服务器应用程序,免费试用12个月>>>   

本文来源于i春秋学院,未经允许严禁转载。

最近打算更新微信机器人,发现机器人的作者将代码改进了很多,但去掉了sqlite数据库,需要自己根据需求设计数据库,跟作者沟通得到的建议是为了防止消息并发导致数据库死锁,建议另开一个进程读写数据库,将消息加入一个队列中,因为对Python了解有限,队列和多线程更不是我擅长的内容,于是最近疯狂Google、百度,探索着实现了此功能。写此文记录下基本概念和实现方法

 

0x00 Python队列
队列是线程中交换数据的形式。
创建一个队列对象

import Queue

q = Queue.Queue(maxsize = 10) #maxsize是队列长度,不限制长度可以不不赋值


将一个值放入队列

q.put(10)


将一个值从队列取出

q.get()


三种队列及构造函数

  • FIFO队列先进先出: class Queue.Queue(maxsize)
  • LIFO类似于堆,即先进后出: class Queue.LifoQueue(maxsize)
  • 优先级队列: class Queue.PriorityQueue(maxsize)
     

队列的常用方法

q.qsize() #返回队列的大小

q.empty() #如果队列为空,返回True,反之False

q.full() #如果队列满了,返回True,反之False

q.full #与 maxsize 大小对应

q.get([block[, timeout]]) #获取队列,block:是否阻塞等待,timeout等待时间

q.get_nowait() #相当q.get(False)

q.put(item[, block[, timeout]) #非阻塞写入队列,block:是否阻塞等待,timeout等待时间

q.put_nowait(item) #相当q.put(item, False)

q.task_done() #在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号

q.join() #等到队列为空,再执行别的操作


几个例子

一个线程往队列里写入随机数,另一个线程从队列里取数字(阻塞等待)

#!/usr/bin/env python

#coding:utf8

import random,threading,time

from Queue import Queue

#Producer thread

class Producer(threading.Thread):

    def __init__(self, t_name, queue):

        threading.Thread.__init__(self,name=t_name)

        self.data=queue

    def run(self):

        for i in range(10):  #随机产生10个数字 ,可以修改为任意大小

            randomnum=random.randint(1,99)

            print "%s: %s 生成了一个数字 %d 并把它扔进了队列!" % (time.ctime(), self.getName(), randomnum)

            self.data.put(randomnum)  #将数据依次存入队列

            time.sleep(1)

        print "%s: %s finished!" %(time.ctime(), self.getName())



#Consumer thread

class Consumer_all(threading.Thread):

    def __init__(self, t_name, queue):

        threading.Thread.__init__(self, name=t_name)

        self.data = queue



    def run(self):

        while 1:

            try:

                # print self.data

                val_even = self.data.get() 

                print "%s: %s 从队列里取出了 %d !" % (time.ctime(), self.getName(), val_even)

                time.sleep(1)

            except  Exception, e: 

                print '取数据失败'

                continue



#Main thread

def main():

    queue = Queue()

    producer = Producer('Pro.', queue)

    consumer_all = Consumer_all('Con_all.', queue)

    producer.start()

    consumer_all.start()



if __name__ == '__main__':

    main()

运行结果

Wed Aug  3 00:04:18 2016: Pro. 生成了一个数字 74 并把它扔进了队列!

Wed Aug  3 00:04:18 2016: Con_all. 从队列里取出了 74 !

Wed Aug  3 00:04:19 2016: Pro. 生成了一个数字 77 并把它扔进了队列!

Wed Aug  3 00:04:19 2016: Con_all. 从队列里取出了 77 !

Wed Aug  3 00:04:20 2016: Pro. 生成了一个数字 63 并把它扔进了队列!

Wed Aug  3 00:04:20 2016: Con_all. 从队列里取出了 63 !

Wed Aug  3 00:04:21 2016: Pro. 生成了一个数字 96 并把它扔进了队列!

Wed Aug  3 00:04:21 2016: Con_all. 从队列里取出了 96 !

Wed Aug  3 00:04:22 2016: Pro. 生成了一个数字 82 并把它扔进了队列!

Wed Aug  3 00:04:22 2016: Con_all. 从队列里取出了 82 !

Wed Aug  3 00:04:23 2016: Pro. 生成了一个数字 19 并把它扔进了队列!

Wed Aug  3 00:04:23 2016: Con_all. 从队列里取出了 19 !

Wed Aug  3 00:04:24 2016: Pro. 生成了一个数字 56 并把它扔进了队列!

Wed Aug  3 00:04:24 2016: Con_all. 从队列里取出了 56 !

Wed Aug  3 00:04:25 2016: Pro. 生成了一个数字 57 并把它扔进了队列!

Wed Aug  3 00:04:25 2016: Con_all. 从队列里取出了 57 !

Wed Aug  3 00:04:26 2016: Pro. 生成了一个数字 42 并把它扔进了队列!

Wed Aug  3 00:04:26 2016: Con_all. 从队列里取出了 42 !

Wed Aug  3 00:04:27 2016: Pro. 生成了一个数字 7 并把它扔进了队列!

Wed Aug  3 00:04:27 2016: Con_all. 从队列里取出了 7 !

Wed Aug  3 00:04:28 2016: Pro. finished!

如果将入队的时间间隔修改,出队的程序将阻塞运行,将“ time.sleep(1)”改为“ time.sleep(10)”,再次运行

Wed Aug  3 00:10:46 2016: Pro. 生成了一个数字 45 并把它扔进了队列!

Wed Aug  3 00:10:46 2016: Con_all. 从队列里取出了 45 !

Wed Aug  3 00:10:56 2016: Pro. 生成了一个数字 75 并把它扔进了队列!

Wed Aug  3 00:10:56 2016: Con_all. 从队列里取出了 75 !

Wed Aug  3 00:11:06 2016: Pro. 生成了一个数字 26 并把它扔进了队列!

Wed Aug  3 00:11:06 2016: Con_all. 从队列里取出了 26 !

Wed Aug  3 00:11:16 2016: Pro. 生成了一个数字 67 并把它扔进了队列!

Wed Aug  3 00:11:16 2016: Con_all. 从队列里取出了 67 !

Wed Aug  3 00:11:26 2016: Pro. 生成了一个数字 60 并把它扔进了队列!

Wed Aug  3 00:11:26 2016: Con_all. 从队列里取出了 60 !

Wed Aug  3 00:11:36 2016: Pro. 生成了一个数字 83 并把它扔进了队列!

Wed Aug  3 00:11:36 2016: Con_all. 从队列里取出了 83 !

Wed Aug  3 00:11:46 2016: Pro. 生成了一个数字 33 并把它扔进了队列!

Wed Aug  3 00:11:46 2016: Con_all. 从队列里取出了 33 !

Wed Aug  3 00:11:56 2016: Pro. 生成了一个数字 65 并把它扔进了队列!

Wed Aug  3 00:11:56 2016: Con_all. 从队列里取出了 65 !

Wed Aug  3 00:12:06 2016: Pro. 生成了一个数字 44 并把它扔进了队列!

Wed Aug  3 00:12:06 2016: Con_all. 从队列里取出了 44 !

Wed Aug  3 00:12:16 2016: Pro. 生成了一个数字 28 并把它扔进了队列!

Wed Aug  3 00:12:16 2016: Con_all. 从队列里取出了 28 !

Wed Aug  3 00:12:26 2016: Pro. finished!

会发现当队列没有值的时候程序会阻塞等待队列有值才继续运行。
稍微修改程序,一个线程往队列里写入随机数,另一个线程从队列里取数字(非阻塞等待)

#!/usr/bin/env python

#coding:utf8

import random,threading,time

from Queue import Queue

#Producer thread

class Producer(threading.Thread):

    def __init__(self, t_name, queue):

        threading.Thread.__init__(self,name=t_name)

        self.data=queue

    def run(self):

        for i in range(10):  #随机产生10个数字 ,可以修改为任意大小

            randomnum=random.randint(1,99)

            print "%s: %s 生成了一个数字 %d 并把它扔进了队列!" % (time.ctime(), self.getName(), randomnum)

            self.data.put(randomnum)  #将数据依次存入队列

            time.sleep(10)

        print "%s: %s finished!" %(time.ctime(), self.getName())



#Consumer thread

class Consumer_all(threading.Thread):

    def __init__(self, t_name, queue):

        threading.Thread.__init__(self, name=t_name)

        self.data = queue



    def run(self):

        while 1:

            try:

                # print self.data

                val_even = self.data.get(1,5)  # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒

                print "%s: %s 从队列里取出了 %d !" % (time.ctime(), self.getName(), val_even)

                time.sleep(1)

            except  Exception, e:  # 等待输入,超过5秒  就报异常

                print '取数据失败'

                continue



#Main thread

def main():

    queue = Queue()

    producer = Producer('Pro.', queue)

    consumer_all = Consumer_all('Con_all.', queue)

    producer.start()

    consumer_all.start()



if __name__ == '__main__':

    main()
Wed Aug  3 00:20:58 2016: Pro. 生成了一个数字 57 并把它扔进了队列!

Wed Aug  3 00:20:58 2016: Con_all. 从队列里取出了 57 !

取数据失败

Wed Aug  3 00:21:08 2016: Pro. 生成了一个数字 83 并把它扔进了队列!

Wed Aug  3 00:21:08 2016: Con_all. 从队列里取出了 83 !

取数据失败

Wed Aug  3 00:21:18 2016: Pro. 生成了一个数字 85 并把它扔进了队列!

Wed Aug  3 00:21:18 2016: Con_all. 从队列里取出了 85 !

取数据失败

Wed Aug  3 00:21:28 2016: Pro. 生成了一个数字 64 并把它扔进了队列!

Wed Aug  3 00:21:28 2016: Con_all. 从队列里取出了 64 !

取数据失败

Wed Aug  3 00:21:38 2016: Pro. 生成了一个数字 23 并把它扔进了队列!

Wed Aug  3 00:21:38 2016: Con_all. 从队列里取出了 23 !

取数据失败

Wed Aug  3 00:21:48 2016: Pro. 生成了一个数字 98 并把它扔进了队列!

Wed Aug  3 00:21:48 2016: Con_all. 从队列里取出了 98 !

取数据失败

Wed Aug  3 00:21:58 2016: Pro. 生成了一个数字 12 并把它扔进了队列!

Wed Aug  3 00:21:58 2016: Con_all. 从队列里取出了 12 !

取数据失败

Wed Aug  3 00:22:08 2016: Pro. 生成了一个数字 27 并把它扔进了队列!

Wed Aug  3 00:22:08 2016: Con_all. 从队列里取出了 27 !

取数据失败

Wed Aug  3 00:22:18 2016: Pro. 生成了一个数字 60 并把它扔进了队列!

Wed Aug  3 00:22:18 2016: Con_all. 从队列里取出了 60 !

取数据失败

Wed Aug  3 00:22:28 2016: Pro. 生成了一个数字 69 并把它扔进了队列!

Wed Aug  3 00:22:28 2016: Con_all. 从队列里取出了 69 !

取数据失败

Wed Aug  3 00:22:38 2016: Pro. finished!

取数据失败

取数据失败

取数据失败

可以看出,取数据的线程在超过设定的等待时间后会抛出异常并继续往下执行。

 

0x01 使用队列将微信机器人消息存入MongoDB

使用上面的例子,稍作修改,将接收到的消息扔进队列,开启另一个线程取数据,取到后将消息格式化并存入数据库

#!/usr/bin/env python

#coding:utf8

import threading,time,pymongo

from pymongo import MongoClient

from Queue import Queue

#消息入队

class MsgInQueue():

    def __init__(self, queue):

        threading.Thread.__init__(self)

        self.data=queue

    def putmsgqueue(self,msg):

        self.data.put(msg)

        print 'put msg in queue success:'+msg['Content']



###消息出队并存入数据库

class MsgOutQueue2db(threading.Thread):

    def __init__(self, queue):

        threading.Thread.__init__(self)

        self.data = queue

        #建立MongoDB连接

        self.conn = MongoClient()

        #数据库

        self.db = self.conn.wechatRobot

        #数据表

        self.messages = self.db.messages

    def run(self):

        while 1:

            try:

                # print self.data

                #从队列里取消息

                msg = self.data.get(1, 5)  # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒

                print "%s: %s get %s from queue !" % (time.ctime(), self.getName(), msg['Content'].encode('utf-8'))

                try:

                    #格式化消息数据

                    m = dict(groupname=msg['FromUserName'].encode('utf-8'),

                             time=msg['CreateTime'],

                             username=msg['ActualUserName'],

                             usernickname=msg['ActualNickName'].encode('utf-8'),

                             message=msg['Content'].encode('utf-8'),

                             messagetype=msg['MsgType']

                             )

                    print m

                    #存入数据库

                    self.db.messages.insert(m)

                    time.sleep(1)

                except  Exception, e:

                    print e

                    continue

            except  Exception, e:

                continue

主线程

def complex_reply():

    queue = Queue()

    outqueue = MsgOutQueue2db(queue)#实例化出队入库类

    outqueue.start()#开启线程



    @itchat.msg_register('Text', isGroupChat = True)

    def text_reply(msg):

        # print itchat.__client.storageClass.groupDict

        print itchat.__client.storageClass.chatroomList

        print msg

        inqueue=MsgInQueue(queue)#实例化入队类

        inqueue.putmsgqueue(msg)#消息入队

        if msg['isAt']:

            print msg

            itchat.send(u'@%s\u2005I received: %s'%(msg['ActualNickName'], msg['Content']), msg['FromUserName'])

消息存入数据库:

 

通过多线程和MongoDB的结合,有效防止消息过多导致数据库死锁的问题,也更加模块化,可以根据真实需求更换其他数据库。后面我将结合MongoDB插入、更新数据快的特点写一下我如何设计群聊统计功能,在Python方面和MongoDB方面我都是小白,如有更好的建议请多指教,我们共同学习!有关Python多线程的课程请参考《python安全编程入门》

<无标签>
举报
i春秋学院
发帖于8个月前 0回/135阅
顶部