java使用rabbitmq在本地项目运行能接收到消息,但是部署到Linux就不能接收到消息

java软件开发疯子 发布于 2017/10/23 11:43
阅读 983
收藏 0

引用的Jar包是

<!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

 

rabbitmq集成spring代码如下:(就是在Linux不能正常收到消息,请各位大神指点)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
     http://www.springframework.org/schema/rabbit  
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
    
    <bean id="mqConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="${Profile.RabbitMQ.Server_Url}" />
        <property name="username" value="${Profile.RabbitMQ.User_name}" />
        <property name="password" value="${Profile.RabbitMQ.Password}" />
        <property name="virtualHost" value="${Profile.RabbitMQ.VirtualHost}" />
        <property name="port" value="${Profile.RabbitMQ.Port}" />
    </bean>

    <!-- 连接服务配置 -->
    <!-- <rabbit:connection-factory id="mqConnectionFactory" host="${Profile.RabbitMQ.Server_Url}"
        username="${Profile.RabbitMQ.User_name}" password="${Profile.RabbitMQ.Password}" port="5672" /> -->

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="mqConnectionFactory" />

    <!-- queue 队列声明,如果mq服务器中没,服务器会自动创建 -->
    <rabbit:queue name="${Profile.RabbitMQ.Queue_Name}" durable="true" auto-delete="false" exclusive="false" />
    
    <!-- 配置线程池 -->
    <bean id="taskExecutor"  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >  
        <!-- 线程池维护线程的最少数量 -->
        <property name ="corePoolSize" value ="5" />
        <!-- 线程池维护线程所允许的空闲时间 -->  
        <property name ="keepAliveSeconds" value ="30000" />
        <!-- 线程池维护线程的最大数量 -->
        <property name ="maxPoolSize" value ="1000" />
        <!-- 线程池所使用的缓冲队列 -->
        <property name ="queueCapacity" value ="200" />
    </bean>
    
    <!-- 消息监听器 -->
    <bean id="messageListener" class="bingo.uums.sync.SyncUUMSDataListener" />
    
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="mqConnectionFactory" task-executor="taskExecutor" acknowledge="manual">
        <rabbit:listener ref="messageListener" method="onMessage" queue-names="${Profile.RabbitMQ.Queue_Name}" />
    </rabbit:listener-container>

</beans>

 

实现类代码如下:

/**
 * This file created at 2017年5月16日.
 *
 * Copyright (c) 2002-2017 Bingosoft, Inc. All rights reserved.
 */
package bingo.uums.sync;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import bingo.common.core.utils.StringUtils;

import com.rabbitmq.client.Channel;

/**
 * 用于监听UUMS数据操作类
 */
public class SyncUUMSDataListener implements ChannelAwareMessageListener {

    private static Logger logger = LoggerFactory.getLogger(SyncUUMSDataListener.class);

    @Autowired
    private SecSystemPublishLogService publishLogService;

    /*
     * 实现ChannelAwareMessageListener类的监听方法
     * 
     * (non-Javadoc)
     * 
     * @see
     * org.springframework.amqp.rabbit.core.ChannelAwareMessageListener#onMessage
     * (org.springframework.amqp.core.Message, com.rabbitmq.client.Channel)
     */
    public void onMessage(Message message, Channel channel) throws Exception {

        String receiveMsg = new String(message.getBody(), "utf-8");

        if (StringUtils.isEmpty(receiveMsg)) {
            logger.error("the class SyncUUMSDataListener receive message receiveMsg is null Time is "
                    + new SimpleDateFormat("yyyy-MM-dd HH🇲🇲ss").format(new Date()));
            return;
        } else {
            logger.info("the receiveMsg is : " + receiveMsg);// 日志中记录每个刷新的数据
        }
        String str = receiveMsg.substring(0, 1)+ receiveMsg.substring(receiveMsg.length() - 1, receiveMsg.length());
        logger.info("str的值:" + str);

        if (str.equals("{}") || str == "{}") {
            // 将接收的信息转换为数组对象
            JSONArray arry = JSONArray.fromObject("[" + receiveMsg + "]");
            for (int i = 0; i < arry.size(); i++) {
                Map<String, Object> syncLogMap = new HashMap<String, Object>();
                JSONObject jsonObject = arry.getJSONObject(i);
                for (Iterator<?> iter = jsonObject.keys(); iter.hasNext();) {
                    String key = (String) iter.next();
                    String value = jsonObject.get(key).toString();
                    if ("null".equals(value)) {
                        value = "";
                    }
                    syncLogMap.put(key, value);
                }
                if (syncLogMap != null) {
                    int result = publishLogService.saveOrUpdatePushLog(
                            syncLogMap, receiveMsg);
                    if (result > 0) {
                        basicACK(message, channel);// 处理正常--ack
                    } else {
                        basicNACK(message, channel);// 处理异常--nack
                    }
                    logger.info(result > 0 ? "执行同步数据成功!" : "执行同步数据失败!");
                }
            }
        }
    }

    // 正常消费掉后通知mq服务器移除此条mq
    private void basicACK(Message message, Channel channel) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                    false);
        } catch (IOException e) {
            logger.error("通知服务器移除mq时异常,异常信息:" + e);
        }
    }

    // 处理异常,mq重回队列
    private void basicNACK(Message message, Channel channel) {
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),
                    false, true);
        } catch (IOException e) {
            logger.error("mq重新进入服务器时出现异常,异常信息:" + e);
        }
    }

}
 

加载中
0
java软件开发疯子
java软件开发疯子

就是连接收消息的日志都没有执行。

0
java软件开发疯子
java软件开发疯子

因为网络管理员为原来的IP设置了一个映射地址,需要将连接的url IP设置为其映射地址即可。

返回顶部
顶部