请教kafka 启用多台consumer 问题?

李仁娟 发布于 2013/05/06 16:04
阅读 5K+
收藏 0

@FrankHui 你好,想跟你请教个问题://多线程方式
    public static void consumer(){
        Properties props = new Properties();  
        props.put("zk.connect", "hadoop-2:2181");  
        props.put("zk.connectiontimeout.ms", "1000000");  
        props.put("groupid", "fans_group");  
          
        // Create the connection to the cluster  
        ConsumerConfig consumerConfig = new ConsumerConfig(props);  
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
          
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("fans", 1);
        
        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
        Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);  
        List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");  
          
        // create list of 4 threads to consume from each of the partitions   
        ExecutorService executor = Executors.newFixedThreadPool(1);  
        long startTime = System.currentTimeMillis();
        // consume the messages in the threads  
        for(final KafkaStream<Message> stream: streams) {  
          executor.submit(new Runnable() {  
            public void run() {  
                 ConsumerIterator<Message> it = stream.iterator();
                  while (it.hasNext()){
                      log.debug(byteBufferToString(it.next().message().payload()));
                  }
              }
            
          });
          log.debug("use time="+(System.currentTimeMillis()-startTime));
        }  
    }

/***
     * ByteBuffer转换为String
     * @param buffer
     * @return
     */
    public static String byteBufferToString(ByteBuffer buffer) {
        CharBuffer charBuffer = null;
        try {
            Charset charset = Charset.forName("UTF-8");
            CharsetDecoder decoder = charset.newDecoder();
            charBuffer = decoder.decode(buffer);
            buffer.flip();
            return charBuffer.toString();
        } catch (Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

这是我的代码。单台没有问题,多台时ConsumerIterator<Message> it = stream.iterator();就这里等待了。难道启多台groupid要不一样?那如果我要取同一个topic中的数据怎么办?并且我没有分区push数据的情况下。

加载中
0
Gaischen
Gaischen
consumer消费的时候  如果已经消费过的数据则不再消费,这个信息都是保存在consumer端的,所以如果你对一批数据要多次消费是要用不同的groupid的  存的时候 根据不同的partition去存储 去的时候就可以针对同一个topic下的不同partition并行执行
0
d
develo
同问啊,天杀的High Level Consumer取不到数据啊,上帝。。。。。。
0
宋鑫001
宋鑫001
N个consumer,就设置partitons的个数为N个,这样kafka会帮你分配。
返回顶部
顶部