Browse Source

feat(iot):【协议改造】emqx 初步改造(50%):修复 Review Agent 反馈的 bug(增强稳定性)

YunaiV 4 months ago
parent
commit
85c1b05bca

+ 44 - 27
yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java

@@ -176,7 +176,8 @@ public class IotEmqxProtocol implements IotProtocol {
         // 2.4 关闭 Vertx
         if (vertx != null) {
             try {
-                vertx.close().result();
+                vertx.close().toCompletionStage().toCompletableFuture()
+                        .get(10, TimeUnit.SECONDS);
                 log.info("[stop][IoT EMQX 协议 {} Vertx 已关闭]", getId());
             } catch (Exception e) {
                 log.error("[stop][IoT EMQX 协议 {} Vertx 关闭失败]", getId(), e);
@@ -195,7 +196,7 @@ public class IotEmqxProtocol implements IotProtocol {
     private void startHttpServer() {
         // 1. 创建路由
         Router router = Router.router(vertx);
-        router.route().handler(BodyHandler.create());
+        router.route().handler(BodyHandler.create().setBodyLimit(1024 * 1024)); // 限制 body 大小为 1MB,防止大包攻击
 
         // 2. 创建处理器
         IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId);
@@ -218,12 +219,13 @@ public class IotEmqxProtocol implements IotProtocol {
             httpServer = vertx.createHttpServer(options)
                     .requestHandler(router)
                     .listen()
-                    .result();
+                    .toCompletionStage().toCompletableFuture()
+                    .get(10, TimeUnit.SECONDS);
             log.info("[startHttpServer][IoT EMQX 协议 {} HTTP Hook 服务启动成功, port: {}, ssl: {}]",
                     getId(), properties.getPort(), httpConfig != null && Boolean.TRUE.equals(httpConfig.getSslEnabled()));
         } catch (Exception e) {
             log.error("[startHttpServer][IoT EMQX 协议 {} HTTP Hook 服务启动失败, port: {}]", getId(), properties.getPort(), e);
-            throw e;
+            throw new RuntimeException("HTTP Hook 服务启动失败", e);
         }
     }
 
@@ -232,7 +234,8 @@ public class IotEmqxProtocol implements IotProtocol {
             return;
         }
         try {
-            httpServer.close().result();
+            httpServer.close().toCompletionStage().toCompletableFuture()
+                    .get(5, TimeUnit.SECONDS);
             log.info("[stopHttpServer][IoT EMQX 协议 {} HTTP Hook 服务已停止]", getId());
         } catch (Exception e) {
             log.error("[stopHttpServer][IoT EMQX 协议 {} HTTP Hook 服务停止失败]", getId(), e);
@@ -258,19 +261,21 @@ public class IotEmqxProtocol implements IotProtocol {
 
     private void stopMqttClient() {
         MqttClient client = this.mqttClient;
-        if (client == null || !client.isConnected()) {
+        this.mqttClient = null;  // 先清理引用
+        if (client == null) {
             return;
         }
-        this.mqttClient = null;
 
-        // 1. 批量取消订阅
-        List<String> topicList = emqxConfig.getMqttTopics();
-        if (CollUtil.isNotEmpty(topicList)) {
-            try {
-                client.unsubscribe(topicList).toCompletionStage().toCompletableFuture()
-                        .get(5, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.warn("[stopMqttClient][IoT EMQX 协议 {} 取消订阅异常]", getId(), e);
+        // 1. 批量取消订阅(仅在连接时)
+        if (client.isConnected()) {
+            List<String> topicList = emqxConfig.getMqttTopics();
+            if (CollUtil.isNotEmpty(topicList)) {
+                try {
+                    client.unsubscribe(topicList).toCompletionStage().toCompletableFuture()
+                            .get(5, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    log.warn("[stopMqttClient][IoT EMQX 协议 {} 取消订阅异常]", getId(), e);
+                }
             }
         }
 
@@ -296,15 +301,15 @@ public class IotEmqxProtocol implements IotProtocol {
                 .setClientId(emqxConfig.getMqttClientId())
                 .setUsername(emqxConfig.getMqttUsername())
                 .setPassword(emqxConfig.getMqttPassword())
-                .setSsl(emqxConfig.getMqttSsl())
-                .setCleanSession(emqxConfig.getCleanSession())
+                .setSsl(Boolean.TRUE.equals(emqxConfig.getMqttSsl()))
+                .setCleanSession(Boolean.TRUE.equals(emqxConfig.getCleanSession()))
                 .setKeepAliveInterval(emqxConfig.getKeepAliveIntervalSeconds())
                 .setMaxInflightQueue(emqxConfig.getMaxInflightQueue());
         options.setConnectTimeout(emqxConfig.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒
-        options.setTrustAll(emqxConfig.getTrustAll());
+        options.setTrustAll(Boolean.TRUE.equals(emqxConfig.getTrustAll()));
         // 1.2 配置遗嘱消息
         IotEmqxConfig.Will will = emqxConfig.getWill();
-        if (will.isEnabled()) {
+        if (will != null && will.isEnabled()) {
             Assert.notBlank(will.getTopic(), "遗嘱消息主题(emqx.will.topic)不能为空");
             Assert.notNull(will.getPayload(), "遗嘱消息内容(emqx.will.payload)不能为空");
             options.setWillFlag(true)
@@ -313,9 +318,11 @@ public class IotEmqxProtocol implements IotProtocol {
                     .setWillQoS(will.getQos())
                     .setWillRetain(will.isRetain());
         }
-        // 1.3 配置高级 SSL/TLS(仅在启用 SSL 且不信任所有证书时生效)
-        if (Boolean.TRUE.equals(emqxConfig.getMqttSsl()) && !Boolean.TRUE.equals(emqxConfig.getTrustAll())) {
-            IotEmqxConfig.Ssl sslOptions = emqxConfig.getSslOptions();
+        // 1.3 配置高级 SSL/TLS(仅在启用 SSL 且不信任所有证书时生效,且需要 sslOptions 非空)
+        IotEmqxConfig.Ssl sslOptions = emqxConfig.getSslOptions();
+        if (Boolean.TRUE.equals(emqxConfig.getMqttSsl())
+                && Boolean.FALSE.equals(emqxConfig.getTrustAll())
+                && sslOptions != null) {
             if (StrUtil.isNotBlank(sslOptions.getTrustStorePath())) {
                 options.setTrustStoreOptions(new JksOptions()
                         .setPath(sslOptions.getTrustStorePath())
@@ -365,10 +372,11 @@ public class IotEmqxProtocol implements IotProtocol {
      */
     private void closeMqttClient() {
         MqttClient oldClient = this.mqttClient;
-        if (oldClient == null || !oldClient.isConnected()) {
+        this.mqttClient = null;  // 先清理引用
+        if (oldClient == null) {
             return;
         }
-        this.mqttClient = null;
+        // 尽力释放(无论是否连接都尝试 disconnect)
         try {
             oldClient.disconnect().toCompletionStage().toCompletableFuture()
                     .get(5, TimeUnit.SECONDS);
@@ -391,7 +399,11 @@ public class IotEmqxProtocol implements IotProtocol {
                 return;
             }
             log.info("[startMqttClientReconnectChecker][IoT EMQX 协议 {} 检测到断开,尝试重连]", getId());
-            tryReconnectMqttClient();
+            // 用 executeBlocking 避免阻塞 event-loop(tryReconnectMqttClient 内部有同步等待)
+            vertx.executeBlocking(() -> {
+                tryReconnectMqttClient();
+                return null;
+            });
         });
     }
 
@@ -449,7 +461,11 @@ public class IotEmqxProtocol implements IotProtocol {
                 return;
             }
             log.warn("[setupMqttClientHandlers][IoT EMQX 协议 {} 连接断开,立即尝试重连]", getId());
-            vertx.runOnContext(v -> tryReconnectMqttClient());
+            // 用 executeBlocking 避免阻塞 event-loop(tryReconnectMqttClient 内部有同步等待)
+            vertx.executeBlocking(() -> {
+                tryReconnectMqttClient();
+                return null;
+            });
         });
 
         // 2. 异常处理
@@ -497,7 +513,8 @@ public class IotEmqxProtocol implements IotProtocol {
             return;
         }
         MqttQoS qos = MqttQoS.valueOf(emqxConfig.getMqttQos());
-        mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false);
+        mqttClient.publish(topic, Buffer.buffer(payload), qos, false, false)
+                .onFailure(e -> log.error("[publishMessage][IoT EMQX 协议 {} 发布失败, topic: {}]", getId(), topic, e));
     }
 
 }