关于rocketmq的MessageSelector.bySql过滤问题

a304349815 发布于 2018/11/08 15:44
阅读 1K+
收藏 0

【Gopher China万字分享】华为云的Go语言云原生实战经验!>>>

刚开始学习rocketMQ,遇到个问题。版本4.3.1

测试过程中,客户端使用MessageSelector.bySql过滤。一个生产者发送2个消息。每个消息sql过滤条件不同。

启动2个客户端,通过MessageSelector.bySql过滤。但是只有一个能接受到消息,而且接受到的消息跟过滤条件不对应。

从控制台查看能看到2个消息,通过控制台发送这2条消息,客户能正确的收到消息,过滤条件能对应上。

请大神们帮忙看下,第一次接触rocketmq

生成端代码:

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        producer.setNamesrvAddr("IP:9876");
        producer.start();
        Message msg = new Message("TopicTest",
                ("Hello RocketMQ1 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.putUserProperty("sellerId", "1");
        SendResult sendResult = producer.send(msg);
        System.out.printf("发送成功%s%n", sendResult);
        
        
        Message msg1 = new Message("TopicTest",
                ("Hello RocketMQ2 ").getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg1.putUserProperty("sellerId", "2");
        SendResult sendResult1 = producer.send(msg1);
        System.out.printf("发送成功%s%n", sendResult1);
    }

 

消费端代码:我是启动main函数后,在同一个main函数上修改参数,再启动一个客户端

public static void main(String[] args) throws Exception {
        c("c1","1");

        // c("c2","2");
    }

    public  static void c(String name,String value) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup("producer1");
        consumer.setNamesrvAddr("47.104.182.21:9876");
        consumer.subscribe("TopicTest", MessageSelector.bySql("sellerId="+value));
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                try {
                    MessageExt ext = msgs.get(0);
                    System.out.println(name+"收到消息:"+new String(ext.getBody(),"utf-8"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("启动成功:"+name);
    }

加载中
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部