求kafka大牛指导鄙人在学习中遇到的一些问题!!!

johnsen 发布于 2017/05/25 17:20
阅读 432
收藏 0

Linux服务器(172.16.126.78)安装环境如下:

kafka Version:kafka_2.11-0.10.1.0

java version "1.8.0_131"

分别正常启动Zookeeper、Kafka的服务,然后在我本地电脑的eclipse上面也按照网上的示例写了Producer生产者和Consumer消费者的代码,最后在main方法中同时Run as Java Application进行消息生产和消费的测试,但是控制台输出了错误信息,如下:

[调试信息]WARN  kafka.producer.BrokerPartitionInfo  - Error while fetching metadata [{TopicMetadata for topic topic_test -> 
No partition metadata for topic topic_test due to org.apache.kafka.common.errors.LeaderNotAvailableException}] for topic [topic_test]: class org.apache.kafka.common.errors.LeaderNotAvailableException

[调试信息]WARN  kafka.producer.async.DefaultEventHandler  - Failed to send producer request with correlation id 11 to broker 0 with data for partitions [topic_test,0]
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at kafka.producer.Producer.send(Producer.scala:78)
    at kafka.javaapi.producer.Producer.send(Producer.scala:44)
    at com.cjzq.kafka.sender.CjweixinProducer.run(CjweixinProducer.java:32)

生产者代码:

消费者代码:

Test类:

加载中
返回顶部
顶部