mqtt-emqx:paho.mqttv5的简单例子

作者 : admin 本文共3356个字,预计阅读时间需要9分钟 发布时间: 2024-06-10 共2人阅读

# 安装emqx

请参考【http://blog.csdn.net/chenhz2284/article/details/139551293?spm=1001.2014.3001.5502】

# 下面是示例代码

【pom.xml】

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.49</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.11</version>
</dependency>

【MyMqttCallback.java】

package com.chz.myMqttV5.demo1;

@Slf4j
public class MyMqttCallback implements MqttCallback
{
    private String clientId;

    public MyMqttCallback(String clientId)
    {
        this.clientId = clientId;
    }

    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
    }

    public void disconnected(MqttDisconnectResponse disconnectResponse) {
        log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
    }

    public void deliveryComplete(IMqttToken token) {
        log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
    }

    public void mqttErrorOccurred(MqttException exception) {
        log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
    }

    public void authPacketArrived(int reasonCode, MqttProperties properties) {
        log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
    }
}

【MyDemo1MqttV5Client.java】

package com.chz.myMqttV5.demo1;

@Slf4j
public class MyDemo1MqttV5Client {

    private static String clientId = MyDemo1MqttV5Client.class.getSimpleName();

    public static void main(String[] args) throws InterruptedException {
        String broker = "tcp://192.168.44.228:1883";
        int subQos = 1;
        int pubQos = 1;
        String msg;

        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectionOptions options = new MqttConnectionOptions();
            client.setCallback(new MyMqttCallback(clientId));
            client.connect(options);
            client.subscribe("device/#", subQos);

            for(int i=0; i<100; i++){
                msg = "I am "+clientId+":" + i;
                MqttMessage message = new MqttMessage(msg.getBytes());
                message.setQos(pubQos);
                client.publish("device/1", message);
                Thread.sleep(3000L);
            }

            client.disconnect();
            client.close();

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

【MyDemo1MqttV5Sender.java】

package com.chz.myMqttV5.demo1;

@Slf4j
public class MyDemo1MqttV5Sender {

    public static void main(String[] args) throws InterruptedException {
        String broker = "tcp://192.168.44.228:1883";
        String clientId = "MyDemo1MqttV5Sender";
        int subQos = 1;
        int pubQos = 1;
        String msg;

        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectionOptions options = new MqttConnectionOptions();
            client.setCallback(new MyMqttCallback(clientId));
            client.connect(options);
            client.subscribe("device/#", subQos);

            for(int i=0; i<100; i++){
                msg = "I am "+clientId+":" + i;
                MqttMessage message = new MqttMessage(msg.getBytes());
                message.setQos(pubQos);
                client.publish("device/2", message);
                Thread.sleep(3000L);
            }

            client.disconnect();
            client.close();

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

运行【MyDemo1MqttV5Client、MyDemo1MqttV5Sender】,查看输出日志:
【MyDemo1MqttV5Client】的日志:
mqtt-emqx:paho.mqttv5的简单例子插图
【MyDemo1MqttV5Sender】的日志
mqtt-emqx:paho.mqttv5的简单例子插图(1)

本站无任何商业行为
个人在线分享 » mqtt-emqx:paho.mqttv5的简单例子
E-->