springboot集成mqtt

1,在spring boot中集成mqqt

(1)安装MQTT相关配置依赖

<!-- MQTT相关配置 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.5.5</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.5.5</version>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.5</version>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>
(2)在yaml文件中加入mqtt连接配置
mqtt:
  host: tcp://emqx.zscbdic.cn:1883
  clientConfigs:
      clientId: mqttx_56888b1d
      options:
        userName: fsm_mes
        password: fsm_mes
        # 这里表示会话不过期
        cleanSession: false
        # 配置一个默认的主题,加载时不会用到,只能在需要时手动提取
        defaultTopic: devops
        timeout: 1000
        KeepAliveInterval: 10
        #断线重连方式,自动重新连接与会话不过期配合使用会导致
        #断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我
        automaticReconnect: true
        connectionTimeout: 3000
        # 最大链接数
        maxInflight: 100
(3)创建MQTTConfigBuilder类,用来加载yaml中的配置
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;


@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {
    //配置的名称
    public static final String PREFIX = "publish.mqtt";
    /**
     * 服务端地址
     */
    private String host;

    /**
     * 客户端id
     */
    private String clientId;
    /**
     * 配置链接项
     */
    private MqttConnectOptions options;

}

(4)创建一个MQTTClientUtils类,包括创建客户端、连接到MQTT代理、订阅主题、发布消息以及取消订阅

@Slf4j
@Configuration
public class MQTTClientUtils {

    @Autowired
    private MQTTConfigBuilder mqttConfig;

    private Map<String, MqttClient> mqttClientMap = new HashMap<>();

    @PostConstruct
    public void initMqttClients() throws MqttException {
        // 遍历客户端配置数组
        for (MQTTConfigBuilder.MqttClientConfig clientConfig : mqttConfig.getClientConfigs()) {
            // 创建MqttClient实例
            MqttClient client = new MqttClient(mqttConfig.getHost(), clientConfig.getClientId(), (MqttClientPersistence) clientConfig.getOptions());
            // 连接到MQTT代理
            client.connect();
            // 存储MqttClient实例
            mqttClientMap.put(clientConfig.getClientId(), client);
            List<String> topics = clientConfig.getTopics(); 
            for (String topicName : topics) {
                subscribe(client, topicName, clientConfig.getDefaultQos(), new MessageCallbackListener());
            }
        }
    }

    public void subscribe(MqttClient client, String topicName, int qos, IMqttMessageListener messageListener) {
        log.info("Subscribing clientId:{} to topic:{} with QoS:{}", client.getClientId(), topicName, qos);
        try {
            client.subscribe(topicName, qos, messageListener);
        } catch (MqttException e) {
            log.error("Error subscribing clientId:{} to topic:{}", client.getClientId(), topicName, e);
        }
    }

 

    // 消息发送方法,使用Map中的MqttClient实例
    public boolean publish(String clientId, String topicName, String message) {
        MqttClient client = mqttClientMap.get(clientId);
        if (client == null) {
            log.error("No MQTT client found for clientId: {}", clientId);
            return false;
        }
      
    }
    // 取消订阅主题的方法,使用MqttClient实例
    public void cleanTopic(String clientId, String topicName) {
        MqttClient client = mqttClientMap.get(clientId);
        if (client != null) {
            try {
                client.unsubscribe(topicName);
            } catch (MqttException e) {
                log.error("Error unsubscribing clientId:{} from topic:{}", clientId, topicName, e);
            }
        } else {
            log.error("No MQTT client found for clientId: {}", clientId);
        }
    }

    public MQTTClientUtils createDevOpsMQTTClient() {
        this.createMQTTClient();
        return this;
    }

    private MQTTClientUtils connect() {
        mqttClients = new MqttClient[mqttConfig.getClientConfigs().length];
        return null;
    }

    private MqttClient createMQTTClient() {
        try{
            this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId());
            log.info("MQTTClient创建成功!");
            return this.mqttClient;
        }catch (MqttException exception){
            exception.printStackTrace();
            log.error("MQTTClient创建失败!");
            return null;
        }
    }

    /**
     * 消息发送
     * @param topicName
     * @param message
     * @return
     */
    public boolean publish(String topicName, String message) {
        log.info("订阅主题名:{}, message:{}", topicName, message);
        MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
        try {
            this.mqttClient.publish(topicName, mqttMessage);
            return true;
        }catch (MqttException exception){
            exception.printStackTrace();
            return false;
        }
    }

    /**
     * 消息发送 : retained 默认为 false
     * @param topicName
     * @param message
     * @param qos
     * @return
     */
    public boolean publish(String topicName, int qos, String message) {
        log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message);
        MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
        try {
            this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false);
            return true;
        }catch (MqttException exception){
            exception.printStackTrace();
            return false;
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topicName
     * @param qos
     */
    public void subscribe(String topicName, int qos) {
        log.info("订阅主题名:{}, qos:{}", topicName, qos);
        try {
            this.mqttClient.subscribe(topicName, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topicName
     * @param qos
     */
    public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) {
        log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());
        try {
            this.mqttClient.subscribe(topicName, qos, messageListener);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消订阅主题
     * @param topicName 主题名称
     */
    public void cleanTopic(String topicName) {
        log.info("取消订阅主题名:{}", topicName);
        try {
            this.mqttClient.unsubscribe(topicName);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    //这里是初始化方法
    @PostConstruct
    public void initMqttClient()throws MqttException {
        MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect();
        mqttClientUtils.subscribe("testtopic/#", 0, new MessageCallbackListener());
      
    }


}
(5)消息监听处理
@Component
public class MessageCallbackListener implements IMqttMessageListener {

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8);
        System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody);
    }
}

参考连接:Springboot整合mqtt最新教程,完整教程,最佳实践_springboot mqtt-CSDN博客

2,连接多个主题

(1)MQTT配置类加载和存储MQTT客户端的配置信息,使用@ConfigurationProperties来绑定应用程序的配置文件中的MQTT相关设置,并提供这些设置的Java对象表示。应用程序的其他部分就可以通过注入这个配置类来访问MQTT配置信息,不用直接处理配置文件。

@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {
    //配置的名称

    public static final String PREFIX = "mqtt";

    private String host;

    @Data
    public static class MqttClientConfig {
        private String clientId;
        private List<String> topics; // 客户端订阅的主题列表
        private MqttConnectOptions options;
    }

    private List<MqttClientConfig> clientConfigs;


}

(2)MQTT客户端工具类,在Spring应用程序启动时自动配置和连接到MQTT代理服务器,并订阅配置中指定的主题,准备接收和处理消息。

@Slf4j
@Configuration
public class MQTTClientUtils {

    @Autowired
    private MQTTConfigBuilder mqttConfigBuilder;

    private final Map<String, MqttClient> mqttClientMap = new HashMap<>();

    @PostConstruct
    public void initMqttClients() throws MqttException {
        List<MQTTConfigBuilder.MqttClientConfig> clientConfigs = mqttConfigBuilder.getClientConfigs();
        for (MQTTConfigBuilder.MqttClientConfig clientConfig : clientConfigs) {
            // 创建MqttClient实例
            MqttClient client = new MqttClient(mqttConfigBuilder.getHost(), clientConfig.getClientId());
            mqttClientMap.put(clientConfig.getClientId(), client);

            // 设置连接选项
            MqttConnectOptions options = clientConfig.getOptions();

            // 连接到MQTT代理
            client.connect(options);
            log.info("MQTTClient with clientId {} connected successfully.", clientConfig.getClientId());

            // 为每个客户端订阅配置的主题
            for (String topic : clientConfig.getTopics()) {
                client.subscribe(topic, new MessageCallbackListener());
            }
        }
    }

}

(3)配置文件,application.yml文件中配置MQTT连接和主题,MQTT代理服务器的连接信息和两个不同的客户端配置

mqtt:
  host: tcp://emqx.zscbdic.cn:1883
  clientConfigs:
    - clientId: mqttx_56888b1d
      options:
        userName: fsm_mes
        password: fsm_mes
        cleanSession: false
        defaultTopic: devops
        timeout: 1000
        KeepAliveInterval: 10
        automaticReconnect: true
        connectionTimeout: 3000
        # 最大链接数
        maxInflight: 100
      topics: [ "nodered/#" ]
    - clientId: mqttx_e249be06
      options:
        userName: fsm_mes
        password: fsm_mes
        cleanSession: false
        defaultTopic: devops
        timeout: 1000
        KeepAliveInterval: 10
        automaticReconnect: true
        connectionTimeout: 3000
        # 最大链接数
        maxInflight: 100
      topics: [ "testinte" ]


springboot集成mqtt
http://localhost:8090//archives/springbootji-cheng-mqtt
作者
chenll
发布于
2024年08月10日
许可协议