mqtt一连接就读取到消息,但是我并没有发消息啊

浩好先生 发布于 2018/07/11 12:32
阅读 1K+
收藏 0

MQTT工具类

package wxh.utils;

import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import wxh.po.MyMqttCallback;

public class MqttUtil {
    
    public static final String endpoint = "ssl://wxh_test.mqtt.iot.gz.baidubce.com:1884";       //输入创建endpoint返回的SSL地址
    public static final String username = "wxh_test/wxh_thing"; //输入创建thing返回的username
    public static final String password = "DvLK3mnfiP8hF9SJxL7d75KyOHvSo0GdT2pUNQ+ABcQ=";//输入创建principal返回的password
    
    public static MqttClient Connect() throws MqttException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        // 创建SSL连接
        //TrustManagerFactory此类充当基于信任材料源的信任管理器的工厂。每个信任管理器管理特定类型的由安全套接字使用的信任材料
        //  X509支持的加密算法
        TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
        tmf.init((KeyStore)null);// 用证书授权源和相关的信任材料初始化此工厂
        TrustManager[] trustManagers = tmf.getTrustManagers();// 为每种信任材料返回一个信任管理器。
                 
        SSLContext ctx = SSLContext.getInstance("TLS");//SSLContext此类的实例表示安全套接字协议的实现,getInstance("TLS")生成实现指定安全套接字协议的 SSLContext 对象。
        ctx.init(null, trustManagers, null);// 初始化此上下文。
        // 配置MQTT连接
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setSocketFactory(ctx.getSocketFactory());
        // 创建MQTT连接   java-client为标识设备的ID,用户可自己定义,在同一个实例下,每个实体设备需要有一个唯一的ID
        MqttClient client = new MqttClient(endpoint, "java-client");
        client.connect(options);
        client.subscribe("#");
        read(client);
        return client;
    }
    
    
    // 发送消息
    public static void send(String content,MqttMessage message,MqttClient client,String topic) throws Exception {
        message.setPayload(content.getBytes("utf-8"));
        System.out.println("您发送的消息:"+content+client.getClientId());
        System.out.println(topic);
        client.publish(topic, message);
        System.out.println("send ok");
    }
        
    //读取消息
    public static void read(MqttClient client) throws MqttException {
        MyMqttCallback mqttCallback = new MyMqttCallback(client);
        client.setCallback(mqttCallback);
    }
    
    //取消订阅
    public static void unsubscribe(MqttClient client,String topic) throws MqttException {
        client.unsubscribe(topic);
    }
    
    //关闭连接
    public static void disconnect(MqttClient client) throws MqttException {
        client.disconnect();
    }
}

回调类

package wxh.po;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import net.sf.json.JSONObject;
import wxh.service.Service;
import wxh.service.ServiceImpl;
import wxh.utils.CommonUtil;
import wxh.utils.TokenThread;

public class MyMqttCallback implements MqttCallback{
    Service service = new ServiceImpl();
    MqttClient client = null;
    public MyMqttCallback(MqttClient client) {
        this.client = client;
    }
    
    @Override
    public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
        String keyword1 = new String(arg1.getPayload(),"UTF-8");
        System.out.println("读取消息"+keyword1);
        
    }
    

    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {
        
    }


    @Override
    public void connectionLost(Throwable arg0) {
        
    }
}
 

测试类

public class Test {

    public static void main(String[] args) throws Exception {
        MqttClient connect = MqttUtil.Connect();
        MqttMessage message = new MqttMessage();
        //MqttUtil.send("hello", message, connect, "wx");
    }
}

注释掉MqttUtil.send("hello", message, connect, "wx");时,一运行显示“读取消息1d”

解开注释时一运行显示:

您发送的消息:hellojava-client
wx
send ok
读取消息hello
读取消息1d
读取消息hello

 

恳请各位大神指导

加载中
0
OSC_pJEBxm
OSC_pJEBxm
这条消息应该设置retain为true
0
行者_无界
行者_无界

你看下这个https://blog.csdn.net/yangguosb/article/details/78668756

返回顶部
顶部