4
回答
请教有关RabbitMQ消息确认的问题!
利用AWS快速构建适用于生产的无服务器应用程序,免费试用12个月>>>   

@摩云飞 你好,想跟你请教个问题:

下面是RabbitMQ的消息确认机制:

“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”

我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?

<无标签>
举报
庆沉
发帖于4年前 4回/8K+阅
共有4个答案 最后回答: 4年前

这里要说明事情有以下几点:
1.RabbitMQ作为消息队列中间件,就设计成进行保证消息被可靠传递,所以才会有上述“RabbitMQ会将消息投递到下一个consumer客户端”的行为。这个默认行为要想更改,要么去改 RabbitMQ 服务端代码,要么改你的 client 的使用方式。显然后者更可行。
2.“一条异常数据会把我的所有线程挂掉”这个本身就说明你的客户端程序是存在问题的,如果非要找到一种变通的方式(看你上面的描述,似乎不要求消息的可靠传递),可以采用设置 autoAck=1 的方法。这样设置后,一旦消息从 server 端被 deliver 出来后立即会被移除,此时即使你的某个 consumer 线程因为某种原因崩溃掉了,当前消息也不会再次被发送到其他的 consumer 线程上。

--- 共有 1 条评论 ---
庆沉非常感谢您的回复,的确我的这个客户端程序是存在一些问题的,但是现在不能马上把问题解决,所以一方面解决客户端的问题,另一方面也想看下是什么样的数据会导致问题。如果异常数据导致线程挂掉了,我如何实现把这个异常数据保存到自己指定的地方或指定的处理函数吗? 4年前 回复
你的意思是希望在有异常消息会导致你的client线程崩溃时,服务器通过某种方式帮你保存这类消息?最基本的一点,服务器是不会关心你消息内容的,也就是说服务器不会知道当前消息是否会导致你的 consumer 崩溃,服务器只会知道是否有消费者成功处理了该消息,so...服务器不会在你崩溃的时候帮你保存消息,可选的解决办法:在你的 client 在崩溃时通过异常捕获将当前消息保存下来(不一定100%成功);或者在 consumer 刚获取到消息时将其内容持久化(性能问题要考虑)。我也就想到这么多了~~

如果是一个队列只希望一个消费者进行处理,那么定义队列的时候可以指定时独占模式

如果是一个队列由多个消费者,但是只希望消息由其中的一个消费者优先进行处理,当这个消费者挂掉的时候,再由其他消费者进行处理的话,可以给消费者设置不同的优先级

在发送basic.consume信令的时候带上"x-priority" 设置优先级

--- 共有 1 条评论 ---
庆沉非常感谢您!我这边是多个线程读取消息队列来处理,之所以开辟多个线程是为了尽快处理分析这些消息,所以设置优先级恐怕行不通。 4年前 回复
有一个比较“蠢"的方法,你可以对获得的消息求出MD5值,或者就直接存储,然后在程序获取第一条消息时比较它们,如果是相同的你可以直接把它回应掉或者用其他的处理方式。但是这种方法也有一个弊端:客户端处理消息一半时,手动关闭了程序导致再次启动程序时消息丢失,这个时候就需要加上正常退出的标志来判断了。
顶部