apache kafka 遇到 Attempt to heart beat failed since the group is rebalancing, try to re-join group. 该如何解决?

Neon_Wang 发布于 2017/08/16 15:21
阅读 5K+
收藏 0

spirng-kafka的多consumer问题困扰了我好久,今天项目再次出现
Attempt to heart beat failed since the group is rebalancing, try to re-join group.
这个问题,导致消息接收不了了,查询了很多资料,也看了很多相关文章,
但是并没有找到什么解决方法,也许是我搜索方式错了?
只好上这来提问题,希望有人能帮助我解决。

先说下情况:

kafka版本为 9.0.1

由于项目属于分布式的微服务架构,有时候需要消息能到达每个同服务实例,因此需要实现kafka的广播模式,基于kafka同一Topic下不同Group都会收到消息,所以在一开始在kafka属性配置中使用了实例IP作为groupID:

private Map<String, Object> consumerProps() {
        return new CustomHashMap()
                .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, "receiveMessage"+ IpUtil.getLocalhostAddress().replace(".", ""))
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100")
                .put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000")
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

使用@KafkaListener注解方式进行消息接收,其中 receiveKafkaListenerContainerFactory是自定义bean

@KafkaListener(containerFactory = "receiveKafkaListenerContainerFactory", topics = KafkaTopicName.DEVICE_MESSAGE_TOPIC)
    public void onMessageListener(MessageTemplate message){
        log.info("===> receive [{}]", message.getMessage());
        parseMessageAdapter.adapter(message.getType(), message);
    }

服务启动后,发现kafka只接收到了第一条消息,并且这接收到这条消息后就爆出开头所说的错误:

INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[deviceMessageTopic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[deviceMessageTopic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]

看错误信息很容易理解是 kafka 发送心跳时发现正在给分区重新分配consumer导致发送失败,尝试重新加入group。

由于知道kafka执行rebalance情况有以下几种:
1:有新的consumer加入
2:旧的consumer挂了
3:coordinator挂了,集群选举出新的coordinator
4:topic的partition新加
5:consumer调用unsubscrible(),取消topic的订阅

再结合单服务时启动这个情况,因此条件条件1和2是有可能符合的,但是仔细检查配置,并没有发现有不妥的地方。
于是又去Kafka中查看了Topic信息,只有一个分区,也没有异常。

如果哪位朋友看出了错误所在,恳请在此留下你的解决方式,如果对于该问题还想了解更多的细节,也可能提出来,我会及时回复,万分感谢!

加载中
0
s
someorz

你是使用spring-kafka来编写消费者吗?

Neon_Wang
Neon_Wang
@someorz 回复@someorz : 换版本太麻烦,还要把项目中的kafka版本一起更改。 不过问题解决了,是kafka集群配置的有问题,还有consumer中的要加入request_timeout_ms
s
someorz
回复 @Neon_Wang : 呃,那我没招了,要不升下kafka版本试试呗
Neon_Wang
Neon_Wang
@someorz 回复@someorz : 关闭自动提交无用
s
someorz
回复 @Neon_Wang : 可以把自动提交关闭,试试
Neon_Wang
Neon_Wang
@someorz 回复@someorz : 是的 其他配置都是默认配置
下一页
0
呼啦_小呆
呼啦_小呆

consumer消费能力不够,每次心跳 都是在poll recoed的时候发送的,处理时间长的时候导致任务consumer挂掉,重新rebalance。0.9的bug

Neon_Wang
Neon_Wang
@呼啦_小呆 回复@呼啦_小呆 : 并不是消费能力不够的问题导致的,已解决,还是非常感谢
呼啦_小呆
呼啦_小呆
回复 @Neon_Wang : spring-kafka 只是用了一个队列去做consumer的缓冲,你能确定你的队列容量是否够用,队列超时间时间和心跳超时时间是否合理,理论上队列满了是会pause consumer的,0.9kakfa+spring-kafka还有个问题 你在提交offset的时机也是poll的时候,提交超时会导致重复消费数据
Neon_Wang
Neon_Wang
我在接收到消息后调用了线程异步处理,并不会存在处理不及时的情况
0
范德萨第三方
范德萨第三方

你好,现在我也遇到类似问题,请问是kafka集群配置的有问题,具体是哪个配置的问题呢?

范德萨第三方
范德萨第三方
回复 @Neon_Wang : 另外还发现,触发rebalancing这个问题的时间点跟metadata.max.age.ms有关系,但只是第一次到达这个时间才会触发rebalancing,后面就不出现了
范德萨第三方
范德萨第三方
回复 @Neon_Wang : request.timeout.ms增加到70s,session.timeout.ms增加到60s,heartbeat.interval.ms加大到20s,依然会报这个问题
范德萨第三方
范德萨第三方
回复 @Neon_Wang : 现在的情况是,刚起来3到5分钟会出现上面的rebalancing问题,半个小时后基本就不会出现了,但是只要出现一次,就会导致kafka message重复消息,所以还是比较严重的
范德萨第三方
范德萨第三方
回复 @Neon_Wang : 现在采用的是手动提交offset,每次等消息收到一定数量,比如1000条,再提交一次offset,整个onMessage内都try catch了确定没有抛出任何异常
Neon_Wang
Neon_Wang
确认下你的业务代码是否抛出了异常,异常也会导致这种情况发生。如果业务处理流程很长,建议使用异步
下一页
返回顶部
顶部