0
回答
使用BlockingQueue实现简明的JMS连接异常恢复
利用AWS快速构建适用于生产的无服务器应用程序,免费试用12个月>>>   

JMS连接有可能出现异常,对于同步处理(MessageProducer发送或者MessageConsumer调用receive方法)来说, 因为是主动调用的,因此可以通过简单的定时重发来重新创建连接。如果是异步接收,即通过实现MessageListener,就比较麻烦,因为是别人回调 你的代码。

这里当然需要使用ExceptionListener,来监听jms连接异常。并在异常发生后做一些事情,比如试图恢复连接,如果不能恢复,间隔一段时间再试图连接。

以前做法,是ExceptionListener监听到异常后,在onException方法中设置一个标志对象,另外有一个线程来定时监听这个标 志对象,发现异常后做连接的恢复,并重建和注册MessageListener和ExceptionListener到连接。方法比较混乱和复杂。

这里用java concurrent api的BlockingQueue和定时任务(见通过java concurrent实现定时任务)来简化这部分功能的实现。

BlockingQueue是Queue基础上,增加take和put方法,用来支持生产者消费者问题。take是消费者的动作,从Queue中获 取队首的元素,如果队列为空,则自动线程阻塞,等待队列中有元素后再唤醒线程获得队首元素;put是生产者的动作,如果队列满(如果设置队列大小的话), 则线程阻塞,直到有消费者获取队首元素后(队列不满了)唤醒线程加入元素到队列尾部。

在这里,BlockingQueue用来盛放JMSException,即,当ExceptionListener监听到连接异常后,在onException方法中将该JMSException加入队列。

创建队列:

private BlockingQueue<JMSException> exceptionsQueue = 
    new LinkedBlockingQueue<JMSException>();

在ExceptionListener里的实现:

public void onException(JMSException exception) { 
    exceptionsQueue.add(exception); 
}

要有一个ScheduledExecutorService:

private ScheduledExecutorService scheduledExecutorService;

创建检查是否有异常并且做恢复的定时任务:

this.scheduledExecutorService = Executors.newScheduledThreadPool(1); 
this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { 
    @Override 
    public void run() { 
        logger.debug("check jms connection error."); 
        try { 
            exceptionsQueue.take(); 
        } catch (InterruptedException e) { 
            logger.error("exception queue take error: " 
                    + e.getMessage()); 
        } 
        exceptionsQueue.clear(); 
        try { 
            _init(); 
        } catch (JMSException e) { 
            exceptionsQueue.add(e); 
            return; 
        } 
    } 
}, 1000, 1000 * 10, TimeUnit.MILLISECONDS);

完整的代码:

package stat.transfer.receive; 

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

import javax.jms.Connection; 
import javax.jms.ExceptionListener; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 

import org.apache.log4j.Logger; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 

import util.jms.JmsConfigBean; 

public class MessageReceiveService { 

    private static final Logger logger = Logger 
            .getLogger(MessageReceiveService.class); 

    @Autowired 
    @Qualifier("jmsConfigBean") 
    protected JmsConfigBean jmsConfigBean; 

    private MessageReceiveWorker worker; 

    private ScheduledExecutorService scheduledExecutorService; 

    private BlockingQueue<JMSException> exceptionsQueue; 

    public MessageReceiveService() { 
        logger.debug("create message receive service …"); 
        this.exceptionsQueue = new LinkedBlockingQueue<JMSException>(); 
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1); 
        this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { 
            @Override 
            public void run() { 
                logger.debug("check jms connection error."); 
                try { 
                    exceptionsQueue.take(); 
                } catch (InterruptedException e) { 
                    logger.error("exception queue take error: " 
                            + e.getMessage()); 
                } 
                exceptionsQueue.clear(); 
                try { 
                    _init(); 
                } catch (JMSException e) { 
                    exceptionsQueue.add(e); 
                    return; 
                } 
            } 
        }, 1000, 1000 * 10, TimeUnit.MILLISECONDS); 
        logger.debug("create thread to monitoring jms connection error."); 
    } 

    public void _init() throws JMSException { 
        logger.debug("create new worker …"); 
        this.worker = new MessageReceiveWorker(); 
        logger.debug("create new worker ok."); 
    } 

    public void init() { 
        try { 
            _init(); 
        } catch (JMSException e) { 
            logger.error("create new worker error: " + e.getMessage()); 
            this.exceptionsQueue.add(e); 
        } 
    } 

    public void close() { 
        this.scheduledExecutorService.shutdownNow(); 
        try { 
            this.worker.close(); 
        } catch (JMSException e) { 
            logger.error("close message receive service error: " 
                    + e.getMessage()); 
        } 
    } 

    private void doOnMessage(Session session) throws JMSException { 
        // TODO 
    } 

    class MessageReceiveWorker implements MessageListener, ExceptionListener { 

        private Connection connection; 

        private Session session; 

        private MessageConsumer consumer; 

        public MessageReceiveWorker() throws JMSException { 
            connection = jmsConfigBean.getConnectionFactory() 
                    .createConnection(); 
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 
            consumer = session.createConsumer(session.createQueue(jmsConfigBean 
                    .getQueueName())); 
            consumer.setMessageListener(this); 
            connection.setExceptionListener(this); 
            connection.start(); 
        } 

        public void close() throws JMSException { 
            this.connection.stop(); 
            this.connection.close(); 
        } 

        @Override 
        public void onMessage(Message message) { 
            logger.debug("receive message"); 
            try { 
                doOnMessage(session); 
                session.commit(); 
            } catch (JMSException e) { 
                logger.error("on message error:" + e.getMessage(), e); 
                throw new RuntimeException(e); 
            } 
            logger.debug("receive message ok."); 
        } 

        @Override 
        public void onException(JMSException exception) { 
            exceptionsQueue.add(exception); 
        } 

    } 
}

这里使用到了spring做一些参数比如jms配置信息的IOC配置。就不贴出来了。

原文转自:http://marshal.easymorse.com/archives/3137

举报
鉴客
发帖于6年前 0回/1K+阅
顶部