消费者不能 消费存量消息

wowxhycoming 发布于 2017/05/14 22:12
阅读 463
收藏 0

生产者(producer)先于消费者(consumer)启动,并发送了n条消息到broker上。

然后一个消费者启动,订阅相同的topic,但tags不匹配。不出意外的消费者没有收到任何消息。

再启动一个消费者,订阅相同topic,tags也匹配。为什么仍然无法收到消息?

加载中
0
斯武丶风晴
斯武丶风晴

代码贴出来看看

wowxhycoming
wowxhycoming
官网Hello World代码。 多设置了几个Tag,然后第一个 consumer 没有*座位 sub 。 而是选了其中几个。
0
W
William-T

在kafka中,一个partition中的消息只会被group中的一个consumer消费,可能因为你起了两个consumer,但是两个consumer都属于一个group,同时你只有一个partition

wowxhycoming
wowxhycoming
嗯~~~那rocketmq 怎么认定多个 consumer 是不是同一个? 另外,如何决定 consumer 所在的 partition?
0
斯武丶风晴
斯武丶风晴

引用来自“斯武丶风晴”的评论

代码贴出来看看

不是很清楚你的helloworld是哪个,以下提供被动消费的demo:consumer.subscribe(topic, "*"); 改为你想订阅的tag即可

package io.flysium.sven.demo.mq;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

import io.flysium.framework.util.KryoUtils;

/**
 * 被动消费
 * 
 * @author SvenAugustus
 */
public class PushConsumerDemo1 {

	private static Logger logger = LoggerFactory
			.getLogger(PushConsumerDemo1.class);

	public static void main(String[] args) throws Exception {
		new PushConsumerDemo1();

		while (true) {// 阻止主线程挂掉
			TimeUnit.SECONDS.sleep(2);
		}
	}

	private static PushConsumerDemo1 inst;

	public synchronized static PushConsumerDemo1 getInst() {
		if (inst == null) {
			synchronized (PushConsumerDemo1.class) {
				if (inst == null) {
					inst = new PushConsumerDemo1();
				}
			}
		}
		return inst;
	}

	private String namesrvAddr = "127.0.0.1:9876";
	private String topic = "topic1";
	private String consumerGroup = "ConsumerGroup1";
	private String consumerInstanceName = "consumerInstance2";
	private long pullInterval = 0L;// 被动消费模式下,pull拉取消息的频率,单位分钟

	private DefaultMQPushConsumer consumer = null;

	private PushConsumerDemo1() {
		consumer = new DefaultMQPushConsumer(consumerGroup);
		consumer.setNamesrvAddr(namesrvAddr);
		consumer.setInstanceName(consumerInstanceName);
		try {
			consumer.setMessageModel(MessageModel.CLUSTERING); // 设置为集群消费(区别于广播消费)
			consumer.setConsumeFromWhere(
					ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
			consumer.setPullInterval(pullInterval);

			consumer.subscribe(topic, "*");
			consumer.registerMessageListener(new MessageListenerConcurrently() {

				@Override
				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> messageExtList,
						ConsumeConcurrentlyContext context) {
					//logger.info("消费消息个数:" + messageExtList.size());

					for (MessageExt msg : messageExtList) {
						Object body = KryoUtils.fromBytes(msg.getBody());
						if (body != null) {
							try {
								//System.out.println("消费消息:" + String.valueOf(body));
								logger.info("消费消息:" + String.valueOf(body));
							} catch (Exception e) {
								logger.error("消费消息失败:" + body + ",异常:"
										+ e.getMessage(), e);
								return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 消费失败,稍后重试
							}
						}
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});

			consumer.start();

		} catch (MQClientException e) {
			logger.error(e.getMessage(), e);
		} finally {
			Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
				public void run() {
					try {
						consumer.shutdown();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}
			}));
		}
	}

}

 

返回顶部
顶部