kafka producer端网络中断丢失消息问题

luffing 发布于 2017/01/22 16:58
阅读 2K+
收藏 0

收藏!数据建模最全知识体系解读!>>>

最新想把队列工具切到kafka上面,由于希望消息都能被收到,所以生产者这端的部分配置如下:

        props.put("acks", "all");// 所有的follower全部ack后再发送
        props.put("retries", Integer.MAX_VALUE);// 数据发送失败,一直重试

Integer.MAX_VALUE只是测试时用

我希望模拟producer端网络异常的场景,比如producer端中断一分钟,然后网络恢复,producer能把网络故障期间发送的消息重新发送到consumer.

但是在研究源码的过程中,发现kafka在网络中断的时候可能会直接丢弃掉消息,代码如下.

类(org.apache.kafka.clients.producer.internals.Sender.run方法):

List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

下面是kafka对消息的expire判断逻辑:

        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
            expire = true;
        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
            expire = true;
        else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
            expire = true;

实际上在网络中断时,expire基本上都会为true,kafka会将message封装在batch中,然后再创建个request请求,去发送.但是在这之前消息已经判断为expire了.

Sender的run方法的以下逻辑

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);

会休眠requestTimeoutMs  时间,导致再次run的时候,消息已经expire了

                        if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
                            expiredBatches.add(batch);
                            count++;
                            batchIterator.remove();
                            deallocate(batch);

 

有什么配置项能解决这个问题吗,还是kafka设计时没考虑过producer端的异常重发.

加载中
0
m
mingbao

你怎么没有考虑服务器重启的情况?kafka生产者正在发送消息的时候,kafka生产者被kill了,消息不也丢了吗

0
paulszw
paulszw

这个并不是kafka需要去处理的事情,对于kafka而言RecordBatch在处于可发送状态后的requestTimeout时间内并未被Sender线程取到从而触发网络I/O被发送,换句话说就是这笔RecordBatch此时就是请求超时了,那被丢弃就是应该了。

Kafka的重试是用在单次请求失败的情况下,此时会将RecordBatch重新入队(reenqueue)等待下一次被sender捕捉到,当然也有可能这次requestTimeout了从而彻底被废弃。

所以你的问题解决方案应该是调大超时时间或者在producer发消息的时候加上callback处理异常情况或者直接.get同步调用。

 

 

返回顶部
顶部