springboot集成mqtt
1,在spring boot中集成mqqt
(1)安装MQTT相关配置依赖
<!-- MQTT相关配置 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.5</version>
</dependency>
(2)在yaml文件中加入mqtt连接配置
mqtt:
host: tcp://emqx.zscbdic.cn:1883
clientConfigs:
clientId: mqttx_56888b1d
options:
userName: fsm_mes
password: fsm_mes
# 这里表示会话不过期
cleanSession: false
# 配置一个默认的主题,加载时不会用到,只能在需要时手动提取
defaultTopic: devops
timeout: 1000
KeepAliveInterval: 10
#断线重连方式,自动重新连接与会话不过期配合使用会导致
#断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我
automaticReconnect: true
connectionTimeout: 3000
# 最大链接数
maxInflight: 100
(3)创建MQTTConfigBuilder类,用来加载yaml中的配置
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {
//配置的名称
public static final String PREFIX = "publish.mqtt";
/**
* 服务端地址
*/
private String host;
/**
* 客户端id
*/
private String clientId;
/**
* 配置链接项
*/
private MqttConnectOptions options;
}
(4)创建一个MQTTClientUtils类,包括创建客户端、连接到MQTT代理、订阅主题、发布消息以及取消订阅
@Slf4j
@Configuration
public class MQTTClientUtils {
@Autowired
private MQTTConfigBuilder mqttConfig;
private Map<String, MqttClient> mqttClientMap = new HashMap<>();
@PostConstruct
public void initMqttClients() throws MqttException {
// 遍历客户端配置数组
for (MQTTConfigBuilder.MqttClientConfig clientConfig : mqttConfig.getClientConfigs()) {
// 创建MqttClient实例
MqttClient client = new MqttClient(mqttConfig.getHost(), clientConfig.getClientId(), (MqttClientPersistence) clientConfig.getOptions());
// 连接到MQTT代理
client.connect();
// 存储MqttClient实例
mqttClientMap.put(clientConfig.getClientId(), client);
List<String> topics = clientConfig.getTopics();
for (String topicName : topics) {
subscribe(client, topicName, clientConfig.getDefaultQos(), new MessageCallbackListener());
}
}
}
public void subscribe(MqttClient client, String topicName, int qos, IMqttMessageListener messageListener) {
log.info("Subscribing clientId:{} to topic:{} with QoS:{}", client.getClientId(), topicName, qos);
try {
client.subscribe(topicName, qos, messageListener);
} catch (MqttException e) {
log.error("Error subscribing clientId:{} to topic:{}", client.getClientId(), topicName, e);
}
}
// 消息发送方法,使用Map中的MqttClient实例
public boolean publish(String clientId, String topicName, String message) {
MqttClient client = mqttClientMap.get(clientId);
if (client == null) {
log.error("No MQTT client found for clientId: {}", clientId);
return false;
}
}
// 取消订阅主题的方法,使用MqttClient实例
public void cleanTopic(String clientId, String topicName) {
MqttClient client = mqttClientMap.get(clientId);
if (client != null) {
try {
client.unsubscribe(topicName);
} catch (MqttException e) {
log.error("Error unsubscribing clientId:{} from topic:{}", clientId, topicName, e);
}
} else {
log.error("No MQTT client found for clientId: {}", clientId);
}
}
public MQTTClientUtils createDevOpsMQTTClient() {
this.createMQTTClient();
return this;
}
private MQTTClientUtils connect() {
mqttClients = new MqttClient[mqttConfig.getClientConfigs().length];
return null;
}
private MqttClient createMQTTClient() {
try{
this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId());
log.info("MQTTClient创建成功!");
return this.mqttClient;
}catch (MqttException exception){
exception.printStackTrace();
log.error("MQTTClient创建失败!");
return null;
}
}
/**
* 消息发送
* @param topicName
* @param message
* @return
*/
public boolean publish(String topicName, String message) {
log.info("订阅主题名:{}, message:{}", topicName, message);
MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
try {
this.mqttClient.publish(topicName, mqttMessage);
return true;
}catch (MqttException exception){
exception.printStackTrace();
return false;
}
}
/**
* 消息发送 : retained 默认为 false
* @param topicName
* @param message
* @param qos
* @return
*/
public boolean publish(String topicName, int qos, String message) {
log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message);
MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
try {
this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false);
return true;
}catch (MqttException exception){
exception.printStackTrace();
return false;
}
}
/**
* 订阅某个主题
*
* @param topicName
* @param qos
*/
public void subscribe(String topicName, int qos) {
log.info("订阅主题名:{}, qos:{}", topicName, qos);
try {
this.mqttClient.subscribe(topicName, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题
*
* @param topicName
* @param qos
*/
public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) {
log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());
try {
this.mqttClient.subscribe(topicName, qos, messageListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消订阅主题
* @param topicName 主题名称
*/
public void cleanTopic(String topicName) {
log.info("取消订阅主题名:{}", topicName);
try {
this.mqttClient.unsubscribe(topicName);
} catch (MqttException e) {
e.printStackTrace();
}
}
//这里是初始化方法
@PostConstruct
public void initMqttClient()throws MqttException {
MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect();
mqttClientUtils.subscribe("testtopic/#", 0, new MessageCallbackListener());
}
}
(5)消息监听处理
@Component
public class MessageCallbackListener implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody);
}
}
参考连接:Springboot整合mqtt最新教程,完整教程,最佳实践_springboot mqtt-CSDN博客
2,连接多个主题
(1)MQTT配置类加载和存储MQTT客户端的配置信息,使用@ConfigurationProperties
来绑定应用程序的配置文件中的MQTT相关设置,并提供这些设置的Java对象表示。应用程序的其他部分就可以通过注入这个配置类来访问MQTT配置信息,不用直接处理配置文件。
@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {
//配置的名称
public static final String PREFIX = "mqtt";
private String host;
@Data
public static class MqttClientConfig {
private String clientId;
private List<String> topics; // 客户端订阅的主题列表
private MqttConnectOptions options;
}
private List<MqttClientConfig> clientConfigs;
}
(2)MQTT客户端工具类,在Spring应用程序启动时自动配置和连接到MQTT代理服务器,并订阅配置中指定的主题,准备接收和处理消息。
@Slf4j
@Configuration
public class MQTTClientUtils {
@Autowired
private MQTTConfigBuilder mqttConfigBuilder;
private final Map<String, MqttClient> mqttClientMap = new HashMap<>();
@PostConstruct
public void initMqttClients() throws MqttException {
List<MQTTConfigBuilder.MqttClientConfig> clientConfigs = mqttConfigBuilder.getClientConfigs();
for (MQTTConfigBuilder.MqttClientConfig clientConfig : clientConfigs) {
// 创建MqttClient实例
MqttClient client = new MqttClient(mqttConfigBuilder.getHost(), clientConfig.getClientId());
mqttClientMap.put(clientConfig.getClientId(), client);
// 设置连接选项
MqttConnectOptions options = clientConfig.getOptions();
// 连接到MQTT代理
client.connect(options);
log.info("MQTTClient with clientId {} connected successfully.", clientConfig.getClientId());
// 为每个客户端订阅配置的主题
for (String topic : clientConfig.getTopics()) {
client.subscribe(topic, new MessageCallbackListener());
}
}
}
}
(3)配置文件,在application.yml
文件中配置MQTT连接和主题,MQTT代理服务器的连接信息和两个不同的客户端配置
mqtt:
host: tcp://emqx.zscbdic.cn:1883
clientConfigs:
- clientId: mqttx_56888b1d
options:
userName: fsm_mes
password: fsm_mes
cleanSession: false
defaultTopic: devops
timeout: 1000
KeepAliveInterval: 10
automaticReconnect: true
connectionTimeout: 3000
# 最大链接数
maxInflight: 100
topics: [ "nodered/#" ]
- clientId: mqttx_e249be06
options:
userName: fsm_mes
password: fsm_mes
cleanSession: false
defaultTopic: devops
timeout: 1000
KeepAliveInterval: 10
automaticReconnect: true
connectionTimeout: 3000
# 最大链接数
maxInflight: 100
topics: [ "testinte" ]
springboot集成mqtt
http://localhost:8090//archives/springbootji-cheng-mqtt