依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置
spring:
mqtt:
url: tcp://192.168.3.27:1883
username: root
password: 123456
clientId: client-test
cleanSession: false
qos: 2
retained: false
connectionTimeout: 60
keepAliveInterval: 30
subscribeTopics:
- test/
- topic/+
- top/+/iic
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
@Configuration
public class MqttConfig {
private String url;
private String username;
private String password;
private String clientId;
private boolean cleanSession = false;
private int qos = 2;
private boolean retained = true;
private int connectionTimeout = 60;
private int keepAliveInterval = 30;
private List<String> subscribeTopics;
}
连接/发布/订阅/重连
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Slf4j
@Service
public class MqttService {
@Autowired
MqttConfig config;
private MqttClient client;
@PostConstruct
public void init() {
connect();
}
public void connect() {
try {
client = new MqttClient(config.getUrl(), config.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(config.isCleanSession());
options.setAutomaticReconnect(true);
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray());
options.setConnectionTimeout(config.getConnectionTimeout());
options.setKeepAliveInterval(config.getKeepAliveInterval());
client.setCallback(new MqttCallBack(config, client));
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
public void send(String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(config.getQos());
mqttMessage.setRetained(config.isRetained());
mqttMessage.setPayload(message.getBytes());
MqttTopic mqttTopic = client.getTopic(topic);
MqttDeliveryToken token;
try {
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
log.info("消息ID: {}, topic: {}, message {} 已发送.", token.getMessageId(), topic, message);
} catch (MqttException e) {
log.info("topic: {} message {} 发送异常.", topic, message);
e.printStackTrace();
}
}
}
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
@Data
public class MqttCallBack implements MqttCallbackExtended {
private MqttConfig config;
private MqttClient client;
public MqttCallBack(MqttConfig config, MqttClient client) {
this.config = config;
this.client = client;
}
@Override
public void connectionLost(Throwable throwable) {
log.error("{} 断开连接.", config.getClientId());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
log.info("收到 {} 消息 {}.", topic, msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("消息ID: {} 发送成功.", token.getMessageId());
}
@Override
public void connectComplete(boolean b, String url) {
log.info("MQTT {} 连接 {} 成功.", config.getClientId(), url);
try {
if (client.isConnected()) {
for (String topic : config.getSubscribeTopics()) {
client.subscribe(topic, config.getQos());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}