1
回答
kafka集成zookeeper,使用最新kafka-client,自动提交,测试异常情况,数据未丢失?
注册华为云得mate10,2.9折抢先购!>>>   

如题和下面consumer代码片段:


Properties props = new Properties();
props.put("bootstrap.servers", "192.168.83.20:9092");
props.put("metadata.broker.list", "192.168.83.20:2181");
props.put("group.id", "default1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("top1"));
final int minBatchSize = 1;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
System.out.println("进入了......");
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
   System.out.println("进入了......records");
	  buffer.add(record);
   try {
		Thread.sleep(1000); //到这个地方的时候过大概4、5s,把consumer停掉
		System.out.println("进入了......records--1000---");
		Thread.sleep(20000);
	} catch (Exception e) {
		e.printStackTrace();
	}
	  System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());  
  }
//          consumer.commitAsync();
}
 就是producer发送消息 。然后consumer端接受消息,consumer里让当前操作睡眠20s, 在consumer接收到消息5s左右,把consumer停掉。设置自动提交,而且时间是1s,经测试发现数据未丢失,什么情况 ? 


举报
bolean
发帖于2年前 1回/1K+阅
顶部