flume传递到kafka的消息,consumer接收不到

wanghu1983 发布于 2017/05/10 12:41
阅读 479
收藏 0

做的是flume+kafka,把flume接收到的消息传递到kafka,后台程序监控消息队列,取出flume传递过来的消息。现在的问题是:

1、通过Producer生产的消息,Consumer程序能接收到,但是flume生产的消息,consumer接收不到;

2、flume生产的消息通过kafka的kafka-console-consumer.sh命令行能接收到。

 

flume整合kafka的配置:

    

agent.sources.s1.type = netcat
agent.sources.s1.bind = 192.168.80.129
agent.sources.s1.port = 44444
agent.sources.s1.channels=c1

#config channels
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

#config sinks
agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.channel=c1
agent.sinks.k1.brokerList=192.168.80.129:9092
agent.sinks.k1.topic=test
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.producer.type=sync
agent.sinks.k1.custom.encoding=UTF-8
agent.sinks.k1.custom.topic.name=test

 

consumer部分代码:

public class KafkaConsumerTest{

    public static void main(String[] args){           

            String zkInfo = "192.168.80.129:2181";//zookeeper地址
            String topic = "test";//topic名称,与上边flume里配置的topic相同
            KafkaConsumerTest consumer = new KafkaConsumerTest();
            consumer.setConsumer(zkInfo);
            consumer.consume(lock,topic);

        }

 /**
     * 
     * @param zkInfo
     */
    public void setConsumer(String zkInfo) {  
        Properties props = new Properties();  
        props.put("zookeeper.connect",zkInfo);  
        props.put("group.id", "test-consumer-group");  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        ConsumerConfig config = new ConsumerConfig(props);  
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
    }

 /**
     * 
     * @param hdfsPath
     * @param topic
     * @throws InterruptedException 
     */
    public void consume(String lock,String topic) throws InterruptedException {  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, new Integer(1));  
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
 Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
        List<KafkaStream<String, String>> streams = consumerMap.get(topic);
        System.out.println(streams.size());
        ThreadPoolManager tpm = ThreadPoolManager.newInstance();

        for(final KafkaStream stream : streams){
            tpm.dbShortSchedule(new ConsumerThread(stream,lock), 0);
        }
        System.out.println("finish");  
    }  

}

线程里就是打印接收到的消息,就不贴代码了。

consumer命令行接收的命令是:(这是在192.168.80.129上运行的,所以写了localhost,java代码是在本地eclipse里运行的,写了ip地址)

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 

不知道哪里有问题,还请各位指点一二!!!

加载中
返回顶部
顶部