在python-flask中调用kafka发送消息,消费者收不到

340StarObserver 发布于 2016/07/13 21:02
阅读 3K+
收藏 0

收藏!数据建模最全知识体系解读!>>>

首先,在没有flask的时候,我写了kafka的程序,一个生产者,一个消费者。

生产者进程,它负责读取本地的一张图片,把图片的二进制数据以消息的形式发送给kafka,代码如下:

        from kafka import KafkaProducer

        # get the binary data of a picture
        f=open('/home/seven/Pictures/fff.png','rb')
        data=f.read()
        f.close()

        # create a producer
        producer=KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer=str.encode)

        # send th binary data to kafka
        producer.send('img_msg',key="Hello,Assassin424214141",value=data)

消费者进程,它负责接收消息,并用消息内容还原出一张图片存储在磁盘上,代码如下:

        from kafka import KafkaConsumer

        # create a consumer
        consumer = KafkaConsumer('img_msg',bootstrap_servers=['localhost:9092'])

        # receive messages
        for message in consumer:
            # print the message
            print ("hello %s:%d:%d: key=%s"%(message.topic, message.partition,message.offset, message.key))
            # use the message's data to create a picture
            img_data=message.value
            outfile=open("test.png","wb")
            outfile.write(img_data)
            outfile.close()

代码比较简陋,是个测试用的原型,并且是可以跑的。

然而,我把生产者的代码放到flask(python-flask是一个web框架)中后。flask程序接收客户端的请求(客户端会上传一张图片),确实是接收到图片了,而且消息发送给kafka的时候也没有报错。但是怪异的是kafka的消费者那边却始终没有动静...

然后我做了一个极端的测试,服务端(即flask)把接收到的客户端上传的图片先写到磁盘上,再从磁盘上读取这张图片(就和一开始的测试原型是一样的逻辑,而且验证了图片是成功写到磁盘上的,即图片接收这一环节是没有问题的),然而这种情况下kafka的消费者却还是接收不到消息,如之奈何?

我接着做测试,发现一个普遍现象,无论是普通的字符串消息还是二进制的消息,只要不在flask中发送给kafka,则kafka的消费者都能收到但是无论是什么类型的消息,只要把kafka生产者的代码放到flask中去,就会导致消费者那边啥也收不到。

有遇到同样情况的,请不吝赐教~

加载中
0
340StarObserver
340StarObserver

好吧,我发现了更加灵异的问题,我把kafka的生产者放到GET请求中,消费者那边是可以收到的

然而在POST请求中调用kafka的producer,消费者那边就收不到。

感觉自己被flask和kafka玩弄了一样...

又搞了一上午,问题解决了,详细是这样的:  

1.    要在kafka的生产者发送消息后,sleep一会(一般10毫秒就够了),但是这样还不行,准确来说。当flask处理POST请求,同时接收来自客户端的图片数据和非图片数据,kafka消费者就还是收不到消息。必须只能接收图片数据,此时才行得通。我也不知道为什么,感觉好神奇。

2.    好了,总结一下。目前的解决方案是,第一件事是要保证flask中不要同时接收图片和非图片数据第二件事是在kafka-producer发送消息后,sleep十几毫秒。两件事都要做,才能让kafka-consumer接收到消息。虽然sleep不是一个好办法,可以说又是迂行恶首,但目前也只能如此了。
0
340StarObserver
340StarObserver

彻底解决flask中向kafka写入消息,结果消费者收不到消息的问题

假设客户端一次性发送了 : 一个文件(键名为targetfile),两个数字(键名为x和y)

flask服务端的解决方案是这样的 :

获取图片的二进制内容 :

        flask.request.files['targetfile'].read()

获取两个数字参数 :

        post_data=dict(flask.request.form)

        x=post_data['x'][0]

        y=post_data['y'][0]

        // 此处不能用 flask.request.form['x']

        // 也不能够用 flask.request.form.get('x')

        // 也不能够用 flask.request.values.get('x')

        // 否则,kafka的消费者那端就会收不到消息

        // 很诡异,但是目前的这个方案能够解决问题

返回顶部
顶部