|
|
@@ -3,13 +3,15 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|
|
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
|
|
|
-import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO;
|
|
|
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
|
|
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
|
|
|
-import jakarta.annotation.Resource;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
+
|
|
|
/**
|
|
|
* WebSocket 的 {@link IotDataRuleAction} 实现类
|
|
|
* <p>
|
|
|
@@ -24,8 +26,17 @@ import org.springframework.stereotype.Component;
|
|
|
public class IotWebSocketDataRuleAction extends
|
|
|
IotDataRuleCacheableAction<IotDataSinkWebSocketConfig, IotWebSocketClient> {
|
|
|
|
|
|
- @Resource
|
|
|
- private IotWebSocketLockRedisDAO webSocketLockRedisDAO;
|
|
|
+ /**
|
|
|
+ * 锁等待超时时间(毫秒)
|
|
|
+ */
|
|
|
+ private static final long LOCK_WAIT_TIME_MS = 5000;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 重连锁,key 为 WebSocket 服务器地址
|
|
|
+ * <p>
|
|
|
+ * WebSocket 连接是与特定服务器实例绑定的,使用单机锁即可保证重连的线程安全
|
|
|
+ */
|
|
|
+ private final ConcurrentHashMap<String, ReentrantLock> reconnectLocks = new ConcurrentHashMap<>();
|
|
|
|
|
|
@Override
|
|
|
public Integer getType() {
|
|
|
@@ -87,23 +98,32 @@ public class IotWebSocketDataRuleAction extends
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 使用分布式锁进行重连
|
|
|
+ * 使用锁进行重连,保证同一服务器地址的重连操作线程安全
|
|
|
*
|
|
|
* @param webSocketClient WebSocket 客户端
|
|
|
* @param config 配置信息
|
|
|
*/
|
|
|
private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception {
|
|
|
- webSocketLockRedisDAO.lock(config.getServerUrl(), () -> {
|
|
|
+ ReentrantLock lock = reconnectLocks.computeIfAbsent(config.getServerUrl(), k -> new ReentrantLock());
|
|
|
+ boolean acquired = false;
|
|
|
+ try {
|
|
|
+ acquired = lock.tryLock(LOCK_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
|
|
|
+ if (!acquired) {
|
|
|
+ throw new RuntimeException("获取 WebSocket 重连锁超时,服务器: " + config.getServerUrl());
|
|
|
+ }
|
|
|
// 双重检查:获取锁后再次检查连接状态,避免重复连接
|
|
|
if (!webSocketClient.isConnected()) {
|
|
|
log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
|
|
|
- try {
|
|
|
- webSocketClient.connect();
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e);
|
|
|
- }
|
|
|
+ webSocketClient.connect();
|
|
|
}
|
|
|
- });
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new RuntimeException("获取 WebSocket 重连锁被中断,服务器: " + config.getServerUrl(), e);
|
|
|
+ } finally {
|
|
|
+ if (acquired && lock.isHeldByCurrentThread()) {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
}
|