apache kafka 遇到 Attempt to heart beat failed since the group is rebalancing, try to re-join group. 该如何解决?

Neon_Wang 发布于 2017/08/16 15:21
阅读 16K+
收藏 0

领取 1000元人民币SageMaker大礼包!>>>

spirng-kafka的多consumer问题困扰了我好久,今天项目再次出现
Attempt to heart beat failed since the group is rebalancing, try to re-join group.
这个问题,导致消息接收不了了,查询了很多资料,也看了很多相关文章,
但是并没有找到什么解决方法,也许是我搜索方式错了?
只好上这来提问题,希望有人能帮助我解决。

先说下情况:

kafka版本为 9.0.1

由于项目属于分布式的微服务架构,有时候需要消息能到达每个同服务实例,因此需要实现kafka的广播模式,基于kafka同一Topic下不同Group都会收到消息,所以在一开始在kafka属性配置中使用了实例IP作为groupID:

private Map<String, Object> consumerProps() {
        return new CustomHashMap()
                .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                .put(ConsumerConfig.GROUP_ID_CONFIG, "receiveMessage"+ IpUtil.getLocalhostAddress().replace(".", ""))
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100")
                .put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000")
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

使用@KafkaListener注解方式进行消息接收,其中 receiveKafkaListenerContainerFactory是自定义bean

@KafkaListener(containerFactory = "receiveKafkaListenerContainerFactory", topics = KafkaTopicName.DEVICE_MESSAGE_TOPIC)
    public void onMessageListener(MessageTemplate message){
        log.info("===> receive [{}]", message.getMessage());
        parseMessageAdapter.adapter(message.getType(), message);
    }

服务启动后,发现kafka只接收到了第一条消息,并且这接收到这条消息后就爆出开头所说的错误:

INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[deviceMessageTopic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.a.k.c.c.i.AbstractCoordinator.handle:623 - Attempt to heart beat failed since the group is rebalancing, try to re-join group.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsRevoked:244 - partitions revoked:[]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] o.s.k.l.KafkaMessageListenerContainer.onPartitionsAssigned:249 - partitions assigned:[deviceMessageTopic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-kafka-consumer-1] m.h.b.p.listener.receiver.Receiver.onMessageListener:24 - ===> receive [test1234567890test1234567890]

看错误信息很容易理解是 kafka 发送心跳时发现正在给分区重新分配consumer导致发送失败,尝试重新加入group。

由于知道kafka执行rebalance情况有以下几种:
1:有新的consumer加入
2:旧的consumer挂了
3:coordinator挂了,集群选举出新的coordinator
4:topic的partition新加
5:consumer调用unsubscrible(),取消topic的订阅

再结合单服务时启动这个情况,因此条件条件1和2是有可能符合的,但是仔细检查配置,并没有发现有不妥的地方。
于是又去Kafka中查看了Topic信息,只有一个分区,也没有异常。

如果哪位朋友看出了错误所在,恳请在此留下你的解决方式,如果对于该问题还想了解更多的细节,也可能提出来,我会及时回复,万分感谢!

加载中
0
s
someorz

你是使用spring-kafka来编写消费者吗?

Neon_Wang
Neon_Wang
@someorz 回复@someorz : 换版本太麻烦,还要把项目中的kafka版本一起更改。 不过问题解决了,是kafka集群配置的有问题,还有consumer中的要加入request_timeout_ms
s
someorz
回复 @Neon_Wang : 呃,那我没招了,要不升下kafka版本试试呗
Neon_Wang
Neon_Wang
@someorz 回复@someorz : 关闭自动提交无用
s
someorz
回复 @Neon_Wang : 可以把自动提交关闭,试试
Neon_Wang
Neon_Wang
@someorz 回复@someorz : 是的 其他配置都是默认配置
下一页
0
呼啦_小呆
呼啦_小呆

consumer消费能力不够,每次心跳 都是在poll recoed的时候发送的,处理时间长的时候导致任务consumer挂掉,重新rebalance。0.9的bug

Neon_Wang
Neon_Wang
@呼啦_小呆 回复@呼啦_小呆 : 并不是消费能力不够的问题导致的,已解决,还是非常感谢
呼啦_小呆
呼啦_小呆
回复 @Neon_Wang : spring-kafka 只是用了一个队列去做consumer的缓冲,你能确定你的队列容量是否够用,队列超时间时间和心跳超时时间是否合理,理论上队列满了是会pause consumer的,0.9kakfa+spring-kafka还有个问题 你在提交offset的时机也是poll的时候,提交超时会导致重复消费数据
Neon_Wang
Neon_Wang
我在接收到消息后调用了线程异步处理,并不会存在处理不及时的情况
0
范德萨第三方
范德萨第三方

你好,现在我也遇到类似问题,请问是kafka集群配置的有问题,具体是哪个配置的问题呢?

范德萨第三方
范德萨第三方
回复 @Neon_Wang : 另外还发现,触发rebalancing这个问题的时间点跟metadata.max.age.ms有关系,但只是第一次到达这个时间才会触发rebalancing,后面就不出现了
范德萨第三方
范德萨第三方
回复 @Neon_Wang : request.timeout.ms增加到70s,session.timeout.ms增加到60s,heartbeat.interval.ms加大到20s,依然会报这个问题
范德萨第三方
范德萨第三方
回复 @Neon_Wang : 现在的情况是,刚起来3到5分钟会出现上面的rebalancing问题,半个小时后基本就不会出现了,但是只要出现一次,就会导致kafka message重复消息,所以还是比较严重的
范德萨第三方
范德萨第三方
回复 @Neon_Wang : 现在采用的是手动提交offset,每次等消息收到一定数量,比如1000条,再提交一次offset,整个onMessage内都try catch了确定没有抛出任何异常
Neon_Wang
Neon_Wang
确认下你的业务代码是否抛出了异常,异常也会导致这种情况发生。如果业务处理流程很长,建议使用异步
下一页
0
物空飞雨
物空飞雨

一般我们是在一个线程(用户线程)里面执行kafka consumer 的while true循环逻辑的,其实这里有2个线程:一个是用户线程,另一个是心跳线程。心跳线程,我想就是根据heartbeat.interval.ms参数配置的值周期性向coordinator发送心跳包以证明consumer还活着。
如果消息处理逻辑过重,也即用户线程需要执行很长的时间处理消息,然后再提交offset。咋一看,有一个后台心跳线程在不断地发送心跳啊,那为什么group coordinator怎么还老是将consumer移出group,然后导致不断地rebalance呢?
我想,问题应该是 max.poll.interval.ms这个参数引起的吧,因为在ERROR日志中,老是提示:消息处理逻辑花了太长的时间,要么减少max.poll.records值,要么增大session.timeout.ms的值。尽管有后台heartbeat 线程,但是如果consumer的消息处理逻辑时长超过了max.poll.interval.ms ,那么此consumer提交offset就会失败:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

此外,在用户线程中,一般会做一些失败的重试处理。比如通过线程池的 ThreadPoolExecutor#afterExecute()方法捕获到异常,再次提交Runnable任务重新订阅kafka topic。本来消费处理需要很长的时间,如果某个consumer处理超时:消息处理逻辑的时长大于max.poll.interval.ms (或者消息处理过程中发生了异常),被coordinator移出了consumer组,这时由于失败的重试处理,自动从线程池中拿出一个新线程作为消费者去订阅topic,那么意味着有新消费者加入group,就会引发 rebalance,而可悲的是:新的消费者还是来不及处理完所有消息,又被移出group。如此循环,就发生了不停地 rebalance 的现象。

返回顶部
顶部