1
回答
kafka consumer无法获取topic消息。
【腾讯云】校园拼团福利,1核2G服务器10元/月!>>>   

Properties properties = new Properties();  
    properties.put("zookeeper.connect", “192.········”);//声明zk  
    properties.put("group.id", "group2xx");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
    properties.put("auto.offset.reset", "smallest");

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
    topicCountMap.put("zuo", 1); // 一次从主题中获取一个数据  
    Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
    KafkaStream<byte[], byte[]> stream = messageStreams.get("zuo").get(0);// 获取每次接收到的这个数据  
    ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
    System.out.println("----------");
    while(iterator.hasNext()){
        System.out.println("`#############");
        System.out.println("receive:" + new String(iterator.next().message()));
    }
    
        
}
    

<无标签>
举报
顶部