Переглянути джерело

feat:【IoT 物联网】增加 redis + event-bus 的实现(增加 job 清理能力)

YunaiV 1 рік тому
батько
коміт
05ac902dc9

+ 6 - 0
yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java

@@ -53,6 +53,12 @@ public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedis
         this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
     }
 
+    protected AbstractRedisStreamMessageListener(String streamKey, String group) {
+        this.messageType = null;
+        this.streamKey = streamKey;
+        this.group = group;
+    }
+
     @Override
     public void onMessage(ObjectRecord<String, String> message) {
         // 消费消息

+ 26 - 5
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java

@@ -1,11 +1,12 @@
 package cn.iocoder.yudao.module.iot.mq.consumer.rule;
 
+import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
 import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
 import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService;
+import jakarta.annotation.PostConstruct;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.event.EventListener;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 // TODO @puhui999:后面重构哈
@@ -16,14 +17,34 @@ import org.springframework.stereotype.Component;
  */
 @Component
 @Slf4j
-public class IotRuleSceneMessageHandler {
+public class IotRuleSceneMessageHandler implements IotMessageSubscriber<IotDeviceMessage> {
 
     @Resource
     private IotRuleSceneService ruleSceneService;
 
-    @EventListener
-    @Async
+    @Resource
+    private IotMessageBus messageBus;
+
+    @PostConstruct
+    public void init() {
+        messageBus.register(this);
+    }
+
+    @Override
+    public String getTopic() {
+        return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
+    }
+
+    @Override
+    public String getGroup() {
+        return "iot_rule_consumer";
+    }
+
+    @Override
     public void onMessage(IotDeviceMessage message) {
+        if (true) {
+            return;
+        }
         log.info("[onMessage][消息内容({})]", message);
         ruleSceneService.executeRuleSceneByDevice(message);
     }

+ 52 - 4
yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java

@@ -1,5 +1,10 @@
 package cn.iocoder.yudao.module.iot.core.messagebus.config;
 
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.job.RedisStreamMessageCleanupJob;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
 import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
 import cn.iocoder.yudao.module.iot.core.messagebus.core.local.IotLocalMessageBus;
 import cn.iocoder.yudao.module.iot.core.messagebus.core.redis.IotRedisMessageBus;
@@ -8,6 +13,7 @@ import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.redisson.api.RedissonClient;
 import org.springframework.boot.autoconfigure.AutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -18,6 +24,10 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.core.StringRedisTemplate;
 
+import java.util.List;
+
+import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
+
 /**
  * IoT 消息总线自动配置
  *
@@ -40,7 +50,7 @@ public class IotMessageBusAutoConfiguration {
     public static class IotLocalMessageBusConfiguration {
 
         @Bean
-        public IotMessageBus iotLocalMessageBus(ApplicationContext applicationContext) {
+        public IotLocalMessageBus iotLocalMessageBus(ApplicationContext applicationContext) {
             log.info("[iotLocalMessageBus][创建 IoT Local 消息总线]");
             return new IotLocalMessageBus(applicationContext);
         }
@@ -55,8 +65,8 @@ public class IotMessageBusAutoConfiguration {
     public static class IotRocketMQMessageBusConfiguration {
 
         @Bean
-        public IotMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties,
-                                                   RocketMQTemplate rocketMQTemplate) {
+        public IotRocketMQMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties,
+                                                           RocketMQTemplate rocketMQTemplate) {
             log.info("[iotRocketMQMessageBus][创建 IoT RocketMQ 消息总线]");
             return new IotRocketMQMessageBus(rocketMQProperties, rocketMQTemplate);
         }
@@ -65,17 +75,55 @@ public class IotMessageBusAutoConfiguration {
 
     // ==================== Redis 实现 ====================
 
+    /**
+     * 特殊:由于 YudaoRedisMQConsumerAutoConfiguration 关于 Redis stream 的消费是动态注册,所以这里只能拷贝相关的逻辑!!!
+     *
+     * @see cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration
+     */
     @Configuration
     @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "redis")
     @ConditionalOnClass(RedisTemplate.class)
     public static class IotRedisMessageBusConfiguration {
 
         @Bean
-        public IotMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) {
+        public IotRedisMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) {
             log.info("[iotRedisMessageBus][创建 IoT Redis 消息总线]");
             return new IotRedisMessageBus(redisTemplate);
         }
 
+        /**
+         * 创建 Redis Stream 重新消费的任务
+         */
+        @Bean
+        public RedisPendingMessageResendJob iotRedisPendingMessageResendJob(IotRedisMessageBus messageBus,
+                                                                            RedisMQTemplate redisTemplate,
+                                                                            RedissonClient redissonClient) {
+            List<AbstractRedisStreamMessageListener<?>> listeners = getListeners(messageBus);
+            return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient);
+        }
+
+        /**
+         * 创建 Redis Stream 消息清理任务
+         */
+        @Bean
+        public RedisStreamMessageCleanupJob iotRedisStreamMessageCleanupJob(IotRedisMessageBus messageBus,
+                                                                            RedisMQTemplate redisTemplate,
+                                                                            RedissonClient redissonClient) {
+            List<AbstractRedisStreamMessageListener<?>> listeners = getListeners(messageBus);
+            return new RedisStreamMessageCleanupJob(listeners, redisTemplate, redissonClient);
+        }
+
+        private List<AbstractRedisStreamMessageListener<?>> getListeners(IotRedisMessageBus messageBus) {
+            return convertList(messageBus.getSubscribers(), subscriber ->
+                    new AbstractRedisStreamMessageListener<>(subscriber.getTopic(), subscriber.getGroup()) {
+
+                        @Override
+                        public void onMessage(AbstractRedisStreamMessage message) {
+                            throw new UnsupportedOperationException("不应该调用!!!");
+                        }
+                    });
+        }
+
     }
 
 }

+ 7 - 0
yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/redis/IotRedisMessageBus.java

@@ -6,12 +6,15 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
 import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
 import jakarta.annotation.PostConstruct;
 import jakarta.annotation.PreDestroy;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.redis.connection.stream.*;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
 
 import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
 
 import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.buildConsumerName;
 import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.checkRedisVersion;
@@ -28,6 +31,9 @@ public class IotRedisMessageBus implements IotMessageBus {
 
     private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer;
 
+    @Getter
+    private final List<IotMessageSubscriber<?>> subscribers = new ArrayList<>();
+
     public IotRedisMessageBus(RedisTemplate<String, ?> redisTemplate) {
         this.redisTemplate = redisTemplate;
         checkRedisVersion(redisTemplate);
@@ -87,6 +93,7 @@ public class IotRedisMessageBus implements IotMessageBus {
             // ack 消息消费完成
             redisTemplate.opsForStream().acknowledge(subscriber.getGroup(), message);
         });
+        this.subscribers.add(subscriber);
     }
 
 }