xucaiqin 1 سال پیش
والد
کامیت
01576e4279

+ 48 - 0
iot-common/iot-common-core/src/main/java/com/middle/platform/common/core/constant/RabbitConstant.java

@@ -0,0 +1,48 @@
+package com.middle.platform.common.core.constant;
+
+/**
+ * @author xucaiqin
+ * @date 2024-04-01 19:50:29
+ */
+public class RabbitConstant {
+    public static final String MESSAGE_TTL = "x-message-ttl";
+    public static final String AUTO_DELETE = "x-auto-delete";
+
+    // Sync
+    public static String TOPIC_EXCHANGE_SYNC = "iot.e.sync";
+    public static final String ROUTING_SYNC_UP_PREFIX = "iot.r.sync.up.";
+    public static String QUEUE_SYNC_UP = "iot.q.sync.up";
+    public static final String ROUTING_SYNC_DOWN_PREFIX = "iot.r.sync.down.";
+    public static String QUEUE_SYNC_DOWN_PREFIX = "iot.q.sync.down.";
+
+    // Event
+    public static String TOPIC_EXCHANGE_EVENT = "iot.e.event";
+    public static final String ROUTING_DRIVER_EVENT_PREFIX = "iot.r.event.driver.";
+    public static String QUEUE_DRIVER_EVENT = "iot.q.event.driver";
+    public static final String ROUTING_DEVICE_EVENT_PREFIX = "iot.r.event.device.";
+    public static String QUEUE_DEVICE_EVENT = "iot.q.event.device";
+
+    // Metadata
+    public static String TOPIC_EXCHANGE_METADATA = "iot.e.metadata";
+    public static final String ROUTING_DRIVER_METADATA_PREFIX = "iot.r.metadata.driver.";
+    public static String QUEUE_DRIVER_METADATA_PREFIX = "iot.q.metadata.driver.";
+
+    // Command
+    public static String TOPIC_EXCHANGE_COMMAND = "iot.e.command";
+    public static final String ROUTING_DRIVER_COMMAND_PREFIX = "iot.r.command.driver.";
+    public static String QUEUE_DRIVER_COMMAND_PREFIX = "iot.q.command.driver.";
+    public static final String ROUTING_DEVICE_COMMAND_PREFIX = "iot.r.command.device.";
+    public static String QUEUE_DEVICE_COMMAND_PREFIX = "iot.q.command.device.";
+
+    // Data
+    public static String TOPIC_EXCHANGE_DATA = "iot.e.data";
+    public static final String ROUTING_POINT_DATA_PREFIX = "iot.r.data.point.";
+    public static final String RAW_QUEUE = "iot.q.data.raw";
+    public static final String DEAL_QUEUE = "iot.q.data.deal";
+    public static final String DB_QUEUE = "iot.q.data.db";
+
+    // Mqtt
+    public static String TOPIC_EXCHANGE_MQTT = "iot.e.mqtt";
+    public static final String ROUTING_MQTT_PREFIX = "iot.r.mqtt.";
+    public static String QUEUE_MQTT = "iot.q.mqtt";
+}

+ 12 - 0
iot-common/iot-common-rabbitmq/pom.xml

@@ -11,10 +11,22 @@
 
     <artifactId>iot-common-rabbitmq</artifactId>
     <packaging>jar</packaging>
+
     <properties>
         <maven.compiler.source>17</maven.compiler.source>
         <maven.compiler.target>17</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>com.middle.platform</groupId>
+            <artifactId>iot-starter-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.middle.platform</groupId>
+            <artifactId>iot-common-core</artifactId>
+        </dependency>
+    </dependencies>
+
 </project>

+ 23 - 0
iot-common/iot-common-rabbitmq/src/main/java/com/middle/platform/common/rabbitmq/config/RabbitConfig.java

@@ -0,0 +1,23 @@
+package com.middle.platform.common.rabbitmq.config;
+
+import com.middle.platform.common.core.constant.RabbitConstant;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author xucaiqin
+ * @date 2024-04-01 19:46:34
+ */
+@Configuration
+public class RabbitConfig {
+    /**
+     * 数据处理交换机
+     *
+     * @return
+     */
+    @Bean
+    public TopicExchange dataExchange() {
+        return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE_DATA, true, false);
+    }
+}

+ 1 - 0
iot-common/iot-common-rabbitmq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

@@ -0,0 +1 @@
+com.middle.platform.common.rabbitmq.config.RabbitConfig

+ 1 - 1
iot-module/iot-module-data/iot-module-data-biz/pom.xml

@@ -74,7 +74,7 @@
         </dependency>
         <dependency>
             <groupId>com.middle.platform</groupId>
-            <artifactId>iot-starter-rabbitmq</artifactId>
+            <artifactId>iot-common-rabbitmq</artifactId>
         </dependency>
     </dependencies>
     <build>

+ 73 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/config/RabbitmqConfig.java

@@ -0,0 +1,73 @@
+package com.middle.platform.data.biz.config;
+
+import com.middle.platform.common.core.constant.RabbitConstant;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author xucaiqin
+ * @date 2024-04-01 20:02:08
+ */
+@Configuration
+public class RabbitmqConfig {
+    @Bean
+    public SimpleRabbitListenerContainerFactory rabbitFactory(ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory);
+        //设置批量
+        factory.setBatchListener(true);
+        factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效
+        factory.setBatchSize(500);//设置监听器一次批量处理的消息数量
+        return factory;
+    }
+    @Bean
+    public Queue rawQueue() {
+        return new Queue(RabbitConstant.RAW_QUEUE);
+    }
+
+    @Bean
+    public Queue dealQueue() {
+        return new Queue(RabbitConstant.DEAL_QUEUE);
+    }
+
+    @Bean
+    public Queue dbQueue() {
+        return new Queue(RabbitConstant.DB_QUEUE);
+    }
+
+
+    @Bean
+    public Binding dbBinding(TopicExchange dataExchange, Queue dbQueue) {
+        Binding binding = BindingBuilder
+                .bind(dbQueue)
+                .to(dataExchange)
+                .with(RabbitConstant.ROUTING_POINT_DATA_PREFIX + dbQueue.getName());
+        binding.addArgument(RabbitConstant.AUTO_DELETE, true);
+        return binding;
+    }
+    @Bean
+    public Binding dealBinding(TopicExchange dataExchange, Queue dealQueue) {
+        Binding binding = BindingBuilder
+                .bind(dealQueue)
+                .to(dataExchange)
+                .with(RabbitConstant.ROUTING_POINT_DATA_PREFIX + dealQueue.getName());
+        binding.addArgument(RabbitConstant.AUTO_DELETE, true);
+        return binding;
+    }
+    @Bean
+    public Binding rawBinding(TopicExchange dataExchange, Queue rawQueue) {
+        Binding binding = BindingBuilder
+                .bind(rawQueue)
+                .to(dataExchange)
+                .with(RabbitConstant.ROUTING_POINT_DATA_PREFIX + rawQueue.getName());
+        binding.addArgument(RabbitConstant.AUTO_DELETE, true);
+        return binding;
+    }
+
+}

+ 0 - 13
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/QueueEnum.java

@@ -1,13 +0,0 @@
-package com.middle.platform.data.biz.pojo;
-
-/**
- * @author xucaiqin
- * @date 2024-03-11 09:30:09
- */
-public class QueueEnum {
-
-    public static final String RAW_QUEUE = "rawQueue";
-    public static final String DEAL_QUEUE = "dealQueue";
-    public static final String DB_QUEUE = "dbQueue";
-
-}

+ 3 - 2
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/http/HttpService.java

@@ -1,11 +1,12 @@
 package com.middle.platform.data.biz.service.http;
 
-import com.middle.platform.data.biz.pojo.QueueEnum;
+import com.middle.platform.common.core.constant.RabbitConstant;
 import com.middle.platform.data.biz.pojo.data.RawData;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.stereotype.Component;
+
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,6 @@ public class HttpService {
      * @param data
      */
     public void dataReport(String code, String deviceSn, Object data) {
-        threadPoolExecutor.execute(() -> amqpTemplate.convertAndSend(QueueEnum.RAW_QUEUE, new RawData(code, deviceSn, data, 2)));
+        threadPoolExecutor.execute(() -> amqpTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_DATA, RabbitConstant.ROUTING_POINT_DATA_PREFIX + RabbitConstant.RAW_QUEUE, new RawData(code, deviceSn, data, 2)));
     }
 }

+ 2 - 2
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/mqtt/strategy/AttrReportStrategy.java

@@ -1,6 +1,6 @@
 package com.middle.platform.data.biz.service.mqtt.strategy;
 
-import com.middle.platform.data.biz.pojo.QueueEnum;
+import com.middle.platform.common.core.constant.RabbitConstant;
 import com.middle.platform.data.biz.pojo.data.RawData;
 import com.middle.platform.data.biz.service.mqtt.MqttTopicStrategy;
 import com.middle.platform.data.biz.utils.MqttTopicUtil;
@@ -27,6 +27,6 @@ public class AttrReportStrategy implements MqttTopicStrategy {
     public void dealMsg(String topic, Object msg) {
         String deviceSn = MqttTopicUtil.getDeviceSn(topic);
         String productKey = MqttTopicUtil.getProductKey(topic);
-        threadPoolExecutor.execute(() -> amqpTemplate.convertAndSend(QueueEnum.RAW_QUEUE, new RawData(productKey, deviceSn, msg, 2)));
+        threadPoolExecutor.execute(() -> amqpTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_DATA, RabbitConstant.ROUTING_POINT_DATA_PREFIX + RabbitConstant.RAW_QUEUE, new RawData(productKey, deviceSn, msg, 1)));
     }
 }

+ 0 - 41
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/MqQueue.java

@@ -1,41 +0,0 @@
-package com.middle.platform.data.biz.service.rabbitmq;
-
-import com.middle.platform.data.biz.pojo.QueueEnum;
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-/**
- * @author xucaiqin
- * @date 2024-03-08 16:52:42
- */
-@Component
-public class MqQueue {
-    @Bean
-    public Queue rawQueue() {
-        return new Queue(QueueEnum.RAW_QUEUE);
-    }
-
-    @Bean
-    public Queue dealQueue() {
-        return new Queue(QueueEnum.DEAL_QUEUE);
-    }
-
-    @Bean
-    public Queue dbQueue() {
-        return new Queue(QueueEnum.DB_QUEUE);
-    }
-
-    @Bean
-    public SimpleRabbitListenerContainerFactory rabbitFactory(ConnectionFactory connectionFactory) {
-        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
-        factory.setConnectionFactory(connectionFactory);
-        //设置批量
-        factory.setBatchListener(true);
-        factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效
-        factory.setBatchSize(500);//设置监听器一次批量处理的消息数量
-        return factory;
-    }
-}

+ 11 - 7
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/QueueService.java

@@ -4,7 +4,11 @@ import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.date.LocalDateTimeUtil;
 import cn.hutool.core.util.IdUtil;
-import com.middle.platform.data.biz.pojo.*;
+import com.middle.platform.common.core.constant.RabbitConstant;
+import com.middle.platform.data.biz.pojo.OriginalPara;
+import com.middle.platform.data.biz.pojo.ProductPara;
+import com.middle.platform.data.biz.pojo.PropertyData;
+import com.middle.platform.data.biz.pojo.PropertyDto;
 import com.middle.platform.data.biz.pojo.data.DbData;
 import com.middle.platform.data.biz.pojo.data.DealData;
 import com.middle.platform.data.biz.pojo.data.RawData;
@@ -38,7 +42,7 @@ public class QueueService {
     private final TaosService taosService;
 
 
-    @RabbitListener(queues = QueueEnum.RAW_QUEUE, concurrency = "4-10", containerFactory = "rabbitFactory")
+    @RabbitListener(queues = RabbitConstant.RAW_QUEUE, concurrency = "4-10", containerFactory = "rabbitFactory")
     public void rawQueue(List<RawData> list) {
         Map<String, List<RawData>> collect = list.stream().collect(Collectors.groupingBy(d -> d.getCode() + "," + d.getDeviceSn()));
         for (Map.Entry<String, List<RawData>> map : collect.entrySet()) {
@@ -59,13 +63,13 @@ public class QueueService {
                 dealData.setDeviceSn(in.getDeviceSn());
                 dealData.setData(in.getData());
                 dealData.setTime(System.currentTimeMillis());
-                amqpTemplate.convertAndSend(QueueEnum.DEAL_QUEUE, dealData);
+                amqpTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_DATA, RabbitConstant.ROUTING_POINT_DATA_PREFIX + RabbitConstant.DEAL_QUEUE, dealData);
             }
         }
     }
 
 
-    @RabbitListener(queues = QueueEnum.DEAL_QUEUE, concurrency = "6-14")
+    @RabbitListener(queues = RabbitConstant.DEAL_QUEUE, concurrency = "6-14")
     public void dealQueue(DealData in) {
         String msgId = IdUtil.fastSimpleUUID();
         cacheService.setKey(String.format(CacheConstant.ONLINE_CACHE, in.getGuid()), LocalDateTime.now());
@@ -103,21 +107,21 @@ public class QueueService {
         DbData dbData = new DbData();
         dbData.setOriginalPara(originalPara);
         dbData.setPropertyData(propertyData);
-        amqpTemplate.convertAndSend(QueueEnum.DB_QUEUE, dbData);
+        amqpTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_DATA, RabbitConstant.ROUTING_POINT_DATA_PREFIX + RabbitConstant.DB_QUEUE, dbData);
     }
 
 
-    @RabbitListener(queues = QueueEnum.DB_QUEUE, concurrency = "4-10", containerFactory = "rabbitFactory")
+    @RabbitListener(queues = RabbitConstant.DB_QUEUE, concurrency = "4-10", containerFactory = "rabbitFactory")
     public void dbQueue(List<DbData> list) {
         List<OriginalPara> originalParas = list.stream().map(DbData::getOriginalPara).collect(Collectors.toList());
         taosService.batchAddOriginal(originalParas);
+
         List<PropertyData> collect = list.stream().map(t -> {
             PropertyData propertyData = t.getPropertyData();
             if (Objects.nonNull(propertyData)) {
                 for (ProductPara datum : propertyData.getData()) {
                     datum.setReserve(String.valueOf(System.currentTimeMillis() - Long.parseLong(datum.getReserve())));
                 }
-                taosService.addDeviceData(propertyData);
             }
             return propertyData;
         }).filter(Objects::nonNull).collect(Collectors.toList());