各位好!我弄了半天都没能配置 RedeliveryPolicy和 broker,demo是spring整合的activeMQ。
测试过程中activeMQ一直处于启动状态。
模拟的监听消息设置的AcKnowledgeMode=Session Transacted--事务型消息监听,有如下测试结果:
测试代码:
@Component public class QueueTransactionalAckConsumer implements SessionAwareMessageListener { @Override public void onMessage(Message message,Session session) { try { if (message instanceof TextMessage) { boolean jmsRedelivered = message.getJMSRedelivered(); int jmsDeliveryMode = message.getJMSDeliveryMode(); System.out.println("消息内容是:" + ((TextMessage) message).getText()); int acknowledgeMode = session.getAcknowledgeMode(); System.out.println("消息确认类型:"+acknowledgeMode); boolean transacted = session.getTransacted(); System.out.println("事务状态:" +transacted); // 异常测试 int i = 1/0; } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { e.printStackTrace(); } //模拟异常没有被catch,可注释此catch块 // catch (Exception e){ // e.printStackTrace(); // } } }
测试过程分析:
/** * 监听消息:事务型消息确认 * 1.消息处理前 不会告知 消息代理已成功接收消息, * 名称为此消息队列名称的信息 * queue接收到消息时队列中显示: * Number Of Pending Messages 等待消费的消息 +1 * Messages Enqueued 进入队列的消息 +1 * Messages Dequeued 出了队列的消息 保持不变 * 2.消息处理过程中及消息处理完毕: * 2.1 过程中无异常,待消息处理完毕:(与接收到消息时作对比) * Number Of Pending Messages 等待消费的消息 -1 * Messages Enqueued 进入队列的消息 保持不变 * Messages Dequeued 出了队列的消息 +1 * 2.2 过程中有异常,消息处理终止,告知消息处理失败 * 第一次接收消息处理出异常:此消息处于等待消费 * 接着再次接收消息6遍,过程中消息一直处于等待消费状态 * 重复6次之后(一共接收7遍),消费者不再监听到消息。 * 2.2.1 消息处理完毕消息代理的处理: * 此消息如果发送设置Persistent Delivery=false * 则离开此消息名称队列 * 此消息如果发送设置Persistent Delivery=true * 则离开此消息名称队列,但是消息内容被放入queue队列,名称为ActiveMQ.DLQ(deadLetterQueue死信队列) * 2.2.2 消息重复接收6次分析: * a.异常出现后没有进入catch块,而是抛出交给jms去执行内置的ack指令返回 * 跟踪后续处理。。尝试查看返回给broker的ack_type,但是没有看到这方面的处理 * 那么基于ack_type指令的定义:猜测执行如下 * 分析是在出现异常后发送ack_type = REDELIVERED_ACK_TYPE(需重发) * 当重复次数到达6此之后 ack_type = POSION_ACK_TYPE (抛弃) * b.异常出现后异常被捕捉处理 * session会进行commit,并且消息处理返回后,消息从此监听队列中移出,没有进入死信队列 * 疑问: * 没有catch异常时重复接收的处理,消息重发策略 * 进入死信队列的消息怎么处理 * 出现异常的消息能不能让broker对他进行保留 * 问题分析及解决:学习并自定义 redelivery Policy 重投策略 * 私信队列学习并定义,以及后续分析私信队列内容 */
我的配置如下:
<!--<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">--> <!--<property name="brokerURL" value="tcp://localhost:61616"/>--> <!--<property name="userName" value="admin"/>--> <!--<property name="password" value="admin"/>--> <!--<property name="useAsyncSend" value="true"/>--> <!--<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />--> <!--</bean>--> <!--<amq:redeliveryPolicy id="activeMQRedeliveryPolicy"--> <!--destination="#defaultDestination"--> <!--redeliveryDelay="100" maximumRedeliveries="2" />--> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 死信队列重定义,有默认的设置--> <amq:broker > <!--<amq:transportConnectors>--> <!--<amq:transportConnector uri="tcp://localhost:61616"/>--> <!--</amq:transportConnectors>--> <amq:destinationPolicy> <amq:policyMap> <amq:policyEntries> <!-- Set the following policy on all queues using the '>' wildcard 所有的队列使用这个通配符--> <amq:policyEntry queue=">"> <amq:deadLetterStrategy> <!-- The default Dead Letter Queue in ActiveMQ is called ActiveMQ.DLQ; Use the prefix 'DLQ.' for the destination name, and make the DLQ a queue rather than a topic --> <amq:individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> <!--设置 非持久化消息是否放入死信队列,默认false --> </amq:deadLetterStrategy> <amq:deadLetterStrategy> <!-- processNonPersistent 非持久化消息是否放入死信队列,默认false processExpired 过期消息是否放入死信队列,默认true --> <amq:sharedDeadLetterStrategy processNonPersistent="true" processExpired="true"/> </amq:deadLetterStrategy> </amq:policyEntry> </amq:policyEntries> </amq:policyMap> </amq:destinationPolicy> </amq:broker> <!-- SESSION_TRANSACTED : 当session使用事务时,就是使用此模式。 在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。 这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="transacted" concurrency="1"> <jms:listener destination="queue.ack.transacted" ref="queueTransactionalAckConsumer"/> </jms:listener-container>
其中注释的部分是想要配置重发策略的,设置重发间隔时间,重发次数等
有看官网说connectFactory获取策略重新设值,但是也不清楚怎么配置。
然后死信队列的配置(设置新的死信队列名称,哪些消息可以加入死信队列)也是加进去没起作用。
跪求我目前的配置该怎么去修改!!