0%

springboot集成mqtt

依赖

<!--mqtt相关依赖-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

  • application.yml
spring:
  mqtt:
    #tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://192.168.3.27:1883
    username: root
    password: 123456
    clientId: client-test
    cleanSession: false
    qos: 2
    retained: false
    connectionTimeout: 60
    keepAliveInterval: 30
    subscribeTopics:
      - test/#
      - topic/+
      - top/+/iic
  • MqttConfig.java
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @ClassName: MqttConfig
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2023/4/13 14:37
 * @Version: v1.0
 **/
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
@Configuration
public class MqttConfig {
    private String url;
    private String username;
    private String password;
    private String clientId;
    ///是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),
    // 客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
    //设置为true表示每次连接服务器都是以新的身份
    private boolean cleanSession = false;
    //QoS 0,最多交付一次。
    //QoS 1,至少交付一次。
    //QoS 2,只交付一次。
    private int qos = 2;
    //发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。
    // MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
    private boolean retained = true;
    // 连接超时
    private int connectionTimeout = 60;
    // 设置心跳时间 单位为秒
    private int keepAliveInterval = 30;
    // 默认订阅的主题
    private List<String> subscribeTopics;
}

连接/发布/订阅/重连

  • MqttService.java
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
 * @ClassName: MqttService
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2023/4/13 14:39
 * @Version: v1.0
 **/
@Slf4j
@Service
public class MqttService {
    @Autowired
    MqttConfig config;
    private MqttClient client;

    @PostConstruct
    public void init() {
        connect();
    }

    public void connect() {
        try {
            client = new MqttClient(config.getUrl(), config.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(config.isCleanSession());
            // 设置自动重连
            options.setAutomaticReconnect(true);
            options.setUserName(config.getUsername());
            options.setPassword(config.getPassword().toCharArray());
            options.setConnectionTimeout(config.getConnectionTimeout());
            options.setKeepAliveInterval(config.getKeepAliveInterval());
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
//            options.setWill("willTopic", (config.getClientId() + "与服务器断开连接").getBytes(), 0, false);
            client.setCallback(new MqttCallBack(config, client));
            client.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void send(String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(config.getQos());
        mqttMessage.setRetained(config.isRetained());
        mqttMessage.setPayload(message.getBytes());
        MqttTopic mqttTopic = client.getTopic(topic);
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
            log.info("消息ID: {}, topic: {}, message {} 已发送.", token.getMessageId(), topic, message);
        } catch (MqttException e) {
            log.info("topic: {} message {} 发送异常.", topic, message);
            e.printStackTrace();
        }
    }
}
  • MqttCallBack.java
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @ClassName: MqttCallBack
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2023/4/13 15:04
 * @Version: v1.0
 **/
@Slf4j
@Data
public class MqttCallBack implements MqttCallbackExtended {
    private MqttConfig config;
    private MqttClient client;

    public MqttCallBack(MqttConfig config, MqttClient client) {
        this.config = config;
        this.client = client;
    }

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("{} 断开连接.", config.getClientId());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        log.info("收到 {} 消息 {}.", topic, msg);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("消息ID: {} 发送成功.", token.getMessageId());
    }

    @Override
    public void connectComplete(boolean b, String url) {
        log.info("MQTT {} 连接 {} 成功.", config.getClientId(), url);
        try {
            if (client.isConnected()) {
                for (String topic : config.getSubscribeTopics()) {
                    client.subscribe(topic, config.getQos());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}