tomcat启动,运行kafka-consumer端代码报错

kdy1994 发布于 2016/09/03 21:16
阅读 879
收藏 0

启动tomcat,访问kafka-Consumer端代码(注意是从tomcat启动访问startJob()方法

public void startJob() throws Exception{
        Properties props1 = new Properties();
        props1.put("zookeeper.connect", "10.0.11.43:2181/kafka");
        props1.put("group.id", "solr-consumertest4");
        props1.put("rebalance.max.retries", "5");
        props1.put("rebalance.backoff.ms", "2000");
        props1.put("zookeeper.session.timeout.ms", "5000");
        props1.put("auto.offset.reset", "smallest");
        props1.put("zookeeper.connectiontimeout.ms", "100000");
        props1.put("zookeeper.session.timeout.ms", "40000");
        props1.put("zookeeper.sync.time.ms", "200");
        props1.put("auto.commit.interval.ms", "100");
        ConsumerConfig consumerConfig = new ConsumerConfig(props1);
        
        ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        
        Whitelist whitelist = new Whitelist(topic);
        
        List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
        
        if (CollectionUtils.isEmpty(partitions)) {
            System.out.println("empty!");
            TimeUnit.SECONDS.sleep(1);
        }
        
        //消费消息
         for (KafkaStream<byte[], byte[]> partition : partitions) {
             ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
             while (iterator.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> next = iterator.next();
                 System.out.println("partiton:" + next.partition());
                 System.out.println("offset:" + next.offset());
                 System.out.println("message:" + new String(next.message(), "utf-8"));
             }
         }
    }

这行报错:List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);

Caused by: kafka.common.ConsumerRebalanceFailedException: solr-consumertest3_kongdeyu-1472724776895-ef4780df can't rebalance after 5 retries

网上查都是说有两种情况

1同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.

2是将consumer端配置改为rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

可我仔细检查代码,确认就只有启动了一个消费者来取数据,consumer端的配置也改成对应的关系了,可还是报错

死活就是报错

后来我改成从main函数进入startJob()方法

居然就可以取到kafka上的消息了,没有任何错误

真是奇怪不明白为什么,难道kafka不能再tomcat 中运行吗?

求大神解惑

小弟,不胜感激



加载中
0
k
kdy1994
最后找到问题了是jar包的原因
返回顶部
顶部