关于spring jms监听ActiveMQ线程问题

小沫 发布于 2016/05/05 17:13
阅读 960
收藏 0

今天遇到一个奇怪的问题,场景如下:

spring 版本 3.0  activemq 版本 5.9  windows7 64位主机

在一个制定的队列中写入100条信息,应用会自动发现Q中消息,然后通过多线程将消息内容打印到控制台,如果此时将应用强行停止,在重新启动后,应用中监听类就变成单线程读取Q中消息。

代码配置如下:

1.spring 配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/tx
       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
       http://www.springframework.org/schema/jee
       http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd">


    <description>JMS应用配置</description>

    <!-- ActiveMQ 连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${jms.broker_url}</value>
        </property>
    </bean>

    <!-- Spring Caching 连接工厂 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>

    <!-- Spring JMS Template -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
    </bean>

</beans>



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/tx
       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
       http://www.springframework.org/schema/jee
       http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd">


    <description>JMS应用配置</description>

    <!--任务单 Queue定义 -->
    <bean id="workItemQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="workItem"/>
    </bean>

    <!-- 异步接收Queue消息Container -->
    <bean id="workItemqueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="workItemQueue"/>
        <property name="messageListener" ref="workItemMessageListener"/>

        <!-- 初始5个Consumer, 可动态扩展到10 -->
        <property name="concurrentConsumers" value="5"/>
        <property name="maxConcurrentConsumers" value="10"/>
    </bean>


    <!-- 异步接收消息处理类 -->
    <bean id="workItemMessageListener" class="a.jms.AsynSubTaskMsgListener"/>
</beans>



异步接收消息处理类如下:

package a.jms;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by 
 */
public class AsynSubTaskMsgListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(AsynSubTaskMsgListener.class);

    public void onMessage(Message message) {

        try {
            MapMessage mapMessage = (MapMessage) message;
            String msgBody = mapMessage.getString("msgBody");

            logger.info("从消息队列中获取到的工作项消息内容 :{}" ,msgBody);
        } catch (Exception ex) {
            logger.error(ex.getMessage());
        }
    }
}



部署到 Tomcat ,起初 程序运行正常,多线程获取Q内容,如果强行停止Tomcat 在重新启动后,应用就成单线程获取Q内容? 

加载中
返回顶部
顶部