spring整合activemq配置RedeliveryPolicy和broker不成功

fgod 发布于 2016/12/12 21:54
阅读 2K+
收藏 0

各位好!我弄了半天都没能配置 RedeliveryPolicy和 broker,demo是spring整合的activeMQ。

测试过程中activeMQ一直处于启动状态。

模拟的监听消息设置的AcKnowledgeMode=Session Transacted--事务型消息监听,有如下测试结果:

   测试代码:

@Component
public class QueueTransactionalAckConsumer implements SessionAwareMessageListener {

    @Override
    public void onMessage(Message message,Session session) {
        try {
            if (message instanceof TextMessage) {
                boolean jmsRedelivered = message.getJMSRedelivered();
                int jmsDeliveryMode = message.getJMSDeliveryMode();
                System.out.println("消息内容是:" + ((TextMessage) message).getText());
                int acknowledgeMode = session.getAcknowledgeMode();
                System.out.println("消息确认类型:"+acknowledgeMode);
                boolean transacted = session.getTransacted();
                System.out.println("事务状态:" +transacted);
//              异常测试
                int i = 1/0;
            } else {
                System.out.println("Consumer:->Received: " + message);
            }
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
//模拟异常没有被catch,可注释此catch块
//        catch (Exception e){
//            e.printStackTrace();
//        }
    }
}



测试过程分析:
/**
     * 监听消息:事务型消息确认
     *  1.消息处理前 不会告知  消息代理已成功接收消息,
     *         名称为此消息队列名称的信息
     *         queue接收到消息时队列中显示:
     *              Number Of Pending Messages  等待消费的消息  +1
     *              Messages Enqueued           进入队列的消息  +1
     *              Messages Dequeued           出了队列的消息  保持不变
     *  2.消息处理过程中及消息处理完毕:
     *       2.1 过程中无异常,待消息处理完毕:(与接收到消息时作对比)
     *              Number Of Pending Messages  等待消费的消息  -1
     *              Messages Enqueued           进入队列的消息  保持不变
     *              Messages Dequeued           出了队列的消息  +1
     *       2.2 过程中有异常,消息处理终止,告知消息处理失败
     *           第一次接收消息处理出异常:此消息处于等待消费
     *           接着再次接收消息6遍,过程中消息一直处于等待消费状态
     *           重复6次之后(一共接收7遍),消费者不再监听到消息。
     * 2.2.1 消息处理完毕消息代理的处理:
     *                   此消息如果发送设置Persistent Delivery=false
     *              则离开此消息名称队列
     *                      此消息如果发送设置Persistent Delivery=true
     *              则离开此消息名称队列,但是消息内容被放入queue队列,名称为ActiveMQ.DLQ(deadLetterQueue死信队列)
     *               2.2.2 消息重复接收6次分析:
     *                   a.异常出现后没有进入catch块,而是抛出交给jms去执行内置的ack指令返回
     *                      跟踪后续处理。。尝试查看返回给broker的ack_type,但是没有看到这方面的处理
     *                     那么基于ack_type指令的定义:猜测执行如下
     *                       分析是在出现异常后发送ack_type = REDELIVERED_ACK_TYPE(需重发)
     *                       当重复次数到达6此之后 ack_type = POSION_ACK_TYPE (抛弃)
     *                   b.异常出现后异常被捕捉处理
     *                       session会进行commit,并且消息处理返回后,消息从此监听队列中移出,没有进入死信队列
     *              疑问:
     *                   没有catch异常时重复接收的处理,消息重发策略
     *                   进入死信队列的消息怎么处理
     *                   出现异常的消息能不能让broker对他进行保留
     *              问题分析及解决:学习并自定义 redelivery Policy 重投策略
     *                      私信队列学习并定义,以及后续分析私信队列内容
     */

我的配置如下:

     

<!--<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">-->
        <!--<property name="brokerURL" value="tcp://localhost:61616"/>-->
        <!--<property name="userName" value="admin"/>-->
        <!--<property name="password" value="admin"/>-->
        <!--<property name="useAsyncSend" value="true"/>-->
        <!--<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />-->
    <!--</bean>-->
    <!--<amq:redeliveryPolicy id="activeMQRedeliveryPolicy"-->
            <!--destination="#defaultDestination"-->
            <!--redeliveryDelay="100" maximumRedeliveries="2" />-->

       <!-- ActiveMQ 连接工厂 -->
       <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
       <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://localhost:61616"
                           userName="admin" password="admin"
                           />

       <!-- Spring Caching连接工厂 -->
       <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
       <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
              <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
              <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
              <!-- 同上,同理 -->
              <!-- <constructor-arg ref="amqConnectionFactory" /> -->
              <!-- Session缓存数量 -->
              <property name="sessionCacheSize" value="100" />
       </bean>

     <!-- 死信队列重定义,有默认的设置-->
    <amq:broker >
        <!--<amq:transportConnectors>-->
            <!--<amq:transportConnector uri="tcp://localhost:61616"/>-->
        <!--</amq:transportConnectors>-->
        <amq:destinationPolicy>
            <amq:policyMap>
                <amq:policyEntries>
                    <!-- Set the following policy on all queues using the '>' wildcard 所有的队列使用这个通配符-->
                    <amq:policyEntry queue=">">
                        <amq:deadLetterStrategy>
                            <!--
                            The default Dead Letter Queue in ActiveMQ is called ActiveMQ.DLQ;
                            Use the prefix 'DLQ.' for the destination name, and make the DLQ a queue rather than a topic
                            -->
                            <amq:individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
                            <!--设置 非持久化消息是否放入死信队列,默认false -->
                        </amq:deadLetterStrategy>
                        <amq:deadLetterStrategy>
                            <!-- processNonPersistent 非持久化消息是否放入死信队列,默认false
                            processExpired  过期消息是否放入死信队列,默认true
                            -->
                            <amq:sharedDeadLetterStrategy processNonPersistent="true" processExpired="true"/>
                        </amq:deadLetterStrategy>
                    </amq:policyEntry>
                </amq:policyEntries>
            </amq:policyMap>
        </amq:destinationPolicy>
    </amq:broker>

         <!-- SESSION_TRANSACTED : 当session使用事务时,就是使用此模式。
          在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。
          这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合 -->
    <jms:listener-container destination-type="queue" container-type="default"
                            connection-factory="connectionFactory" acknowledge="transacted"
                            concurrency="1">
        <jms:listener destination="queue.ack.transacted" ref="queueTransactionalAckConsumer"/>
    </jms:listener-container>



其中注释的部分是想要配置重发策略的,设置重发间隔时间,重发次数等

有看官网说connectFactory获取策略重新设值,但是也不清楚怎么配置。

然后死信队列的配置(设置新的死信队列名称,哪些消息可以加入死信队列)也是加进去没起作用。

跪求我目前的配置该怎么去修改!!

 


加载中
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部