|
|
@@ -68,7 +68,13 @@ public class IotMqttUpstreamHandler {
|
|
|
|
|
|
log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
|
|
|
|
|
|
- // 2. 设置异常和关闭处理器
|
|
|
+ // 2. 设置心跳处理器(监听客户端的 PINGREQ 消息)
|
|
|
+ endpoint.pingHandler(v -> {
|
|
|
+ log.debug("[handle][收到客户端心跳,客户端 ID: {}]", clientId);
|
|
|
+ // Vert.x 会自动发送 PINGRESP 响应,无需手动处理
|
|
|
+ });
|
|
|
+
|
|
|
+ // 3. 设置异常和关闭处理器
|
|
|
endpoint.exceptionHandler(ex -> {
|
|
|
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, connectionManager.getEndpointAddress(endpoint));
|
|
|
cleanupConnection(endpoint);
|
|
|
@@ -77,7 +83,7 @@ public class IotMqttUpstreamHandler {
|
|
|
cleanupConnection(endpoint);
|
|
|
});
|
|
|
|
|
|
- // 3. 设置消息处理器
|
|
|
+ // 4. 设置消息处理器
|
|
|
endpoint.publishHandler(message -> {
|
|
|
try {
|
|
|
processMessage(clientId, message.topicName(), message.payload().getBytes());
|
|
|
@@ -100,7 +106,7 @@ public class IotMqttUpstreamHandler {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- // 4. 设置订阅处理器
|
|
|
+ // 5. 设置订阅处理器
|
|
|
endpoint.subscribeHandler(subscribe -> {
|
|
|
// 提取主题名称列表用于日志显示
|
|
|
List<String> topicNames = subscribe.topicSubscriptions().stream()
|
|
|
@@ -115,22 +121,22 @@ public class IotMqttUpstreamHandler {
|
|
|
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
|
|
|
});
|
|
|
|
|
|
- // 5. 设置取消订阅处理器
|
|
|
+ // 6. 设置取消订阅处理器
|
|
|
endpoint.unsubscribeHandler(unsubscribe -> {
|
|
|
log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics());
|
|
|
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
|
|
|
});
|
|
|
|
|
|
- // 6. 设置 QoS 2消息的 PUBREL 处理器
|
|
|
+ // 7. 设置 QoS 2消息的 PUBREL 处理器
|
|
|
endpoint.publishReleaseHandler(endpoint::publishComplete);
|
|
|
|
|
|
- // 7. 设置断开连接处理器
|
|
|
+ // 8. 设置断开连接处理器
|
|
|
endpoint.disconnectHandler(v -> {
|
|
|
log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId);
|
|
|
cleanupConnection(endpoint);
|
|
|
});
|
|
|
|
|
|
- // 8. 接受连接
|
|
|
+ // 9. 接受连接
|
|
|
endpoint.accept(false);
|
|
|
}
|
|
|
|