请问一下,spring怎么配置多个kafka主题?

请叫我main 发布于 2018/06/04 09:30
阅读 1K+
收藏 0

解读下一代网络:算力网络正从理想照进现实!>>>

我有一个项目,搭的是springmvc,然后使用了zookeeper和dubbo做的分布式,我在里面添加了kafka消息队列,单个主题跑,没有问题,现在我想配置多个主题,请问该如何配置?(网上好几种方法我都有试,但是有报错,也不知该如何处理,网上没有搜到,请求大佬们的援助)

一下是kafka相关的jar包:

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.0.4.RELEASE</version>
        </dependency>

然后我多个主题配置如下:

    <!-- 消费者容器配置信息 -->
    <bean id="containerProperties"
       class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg >
            <list>
                <value>${topic}</value>
                <value>${homework-topic}</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="messageListernerConsumerService" />
        <property name="ackMode" value="${ackMode}" />
    </bean> 

 这样配置报错:


org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'messageListenerContainer' defined in class path resource [spring-kafka-consumer.xml]: Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1578)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482)
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:775)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:861)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541)
    at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:444)
    at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:326)
    at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107)
    at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4853)
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5314)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:145)
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1408)
    at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1398)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:361)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1706)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1645)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574)
    ... 21 more

求助攻,求助攻,求助攻!!! 

加载中
0
请叫我main
请叫我main

问题已解决,是jar包的问题,低版本存在多个主题list转换成set,转成了null的bug,使用最新的版本,没有出现任何问题了

请叫我main
请叫我main
回复 @岁月无声11 : 你看看的你jar包版本, 可能是jar包的原因.
岁月无声11
您好,我最近也遇到和您相似的问题了,xml文件中配置了主题list,在消费监听的时候 @KafkaListener这个注解怎么配置呢?
0
请叫我main
请叫我main

不好意思,第一次提问题,没有用到代码块,导致看起来有点累,不好意思哈

0
LongKing-Xu
LongKing-Xu
这不是你xml配置错了吗?kafka你创建多主题了吗?如果你kafka创建多主题了,不同的方法调用不同的主题的生产者和消费者跟单个主题是一样的啊!
LongKing-Xu
LongKing-Xu
你单个的怎么配置的,你就再加一个!
请叫我main
请叫我main
那么,请问该如何配置呢?
0
R-Lu
R-Lu

使用KFK可能大部分情况下,不会使用多个topic,因为像RabbitMQ那种Channel,KFK用分区进去替代了,使用起来就像是一个Channel。其实这种思想也是对的,一段业务代码就应该有明显标记来标记它到底是干嘛的,而不是他又多种情况。

请叫我main
请叫我main
一般不用多个topic吗?假如说,我现在是多重情况呢,一种是消息推送类的,一种是数据处理类的,这样也使用同一个topic吗?
0
R-Lu
R-Lu

引用来自“Robinson_lu”的评论

使用KFK可能大部分情况下,不会使用多个topic,因为像RabbitMQ那种Channel,KFK用分区进去替代了,使用起来就像是一个Channel。其实这种思想也是对的,一段业务代码就应该有明显标记来标记它到底是干嘛的,而不是他又多种情况。

Topic是标记消费者的,生产者是使用Topic,并且它是群发模式大部分。

返回顶部
顶部