Browse Source

引入mq队列

xucaiqin 1 năm trước cách đây
mục cha
commit
58cf94c0d9

+ 4 - 14
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/QueueEnum.java

@@ -1,23 +1,13 @@
 package com.middle.platform.data.biz.pojo;
 
-import lombok.Getter;
-
 /**
  * @author xucaiqin
  * @date 2024-03-11 09:30:09
  */
-@Getter
-public enum QueueEnum {
-
-    RAW_QUEUE("rawQueue"),
-    DEAL_QUEUE("dealQueue"),
-    DB_QUEUE("dbQueue"),
-    ;
-
-    private final String name;
+public class QueueEnum {
 
+    public static final String RAW_QUEUE = "rawQueue";
+    public static final String DEAL_QUEUE = "dealQueue";
+    public static final String DB_QUEUE = "dbQueue";
 
-    QueueEnum(String name) {
-        this.name = name;
-    }
 }

+ 16 - 3
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/MqQueue.java

@@ -2,6 +2,8 @@ package com.middle.platform.data.biz.service.rabbitmq;
 
 import com.middle.platform.data.biz.pojo.QueueEnum;
 import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.stereotype.Component;
 
@@ -13,16 +15,27 @@ import org.springframework.stereotype.Component;
 public class MqQueue {
     @Bean
     public Queue rawQueue() {
-        return new Queue(QueueEnum.RAW_QUEUE.getName());
+        return new Queue(QueueEnum.RAW_QUEUE);
     }
 
     @Bean
     public Queue dealQueue() {
-        return new Queue(QueueEnum.DEAL_QUEUE.getName());
+        return new Queue(QueueEnum.DEAL_QUEUE);
     }
 
     @Bean
     public Queue dbQueue() {
-        return new Queue(QueueEnum.DB_QUEUE.getName());
+        return new Queue(QueueEnum.DB_QUEUE);
+    }
+
+    @Bean
+    public SimpleRabbitListenerContainerFactory rabbitFactory(ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory);
+        //设置批量
+        factory.setBatchListener(true);
+        factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效
+        factory.setBatchSize(500);//设置监听器一次批量处理的消息数量
+        return factory;
     }
 }

+ 41 - 33
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/QueueService.java

@@ -18,11 +18,11 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.messaging.handler.annotation.Payload;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * @author xucaiqin
@@ -38,33 +38,39 @@ public class QueueService {
     private final TaosService taosService;
 
 
-    @RabbitListener(queues = "rawQueue", concurrency = "6-8")
-    public void rawQueue1(@Payload RawData in) {
-        ProductVo productVo = productAnalyse.getProduct(in.getCode(), in.getDeviceSn());
-        if (Objects.isNull(productVo)) {
-            log.warn("产品:{} 设备:{} 不存在", in.getCode(), in.getDeviceSn());
-            return;
+    @RabbitListener(queues = QueueEnum.RAW_QUEUE, concurrency = "4-10", containerFactory = "rabbitFactory")
+    public void rawQueue(List<RawData> list) {
+        Map<String, List<RawData>> collect = list.stream().collect(Collectors.groupingBy(d -> d.getCode() + "," + d.getDeviceSn()));
+        for (Map.Entry<String, List<RawData>> map : collect.entrySet()) {
+            String[] split = map.getKey().split(",");
+            List<RawData> value = map.getValue();
+            ProductVo productVo = productAnalyse.getProduct(split[0], split[1]);
+            if (Objects.isNull(productVo)) {
+                log.warn("产品:{} 设备:{} 不存在", split[0], split[1]);
+                return;
+            }
+            // todo
+            //        if (!Objects.equals(productVo.getReportProtocol(), in.getReportProtocol())) {
+            //            log.warn("产品:{} 设备:{} 上报协议未匹配", in.getCode(), in.getDeviceSn());
+            //            return;
+            //        }
+            for (RawData in : value) {
+                DealData dealData = BeanUtil.toBean(productVo, DealData.class);
+                dealData.setDeviceSn(in.getDeviceSn());
+                dealData.setData(in.getData());
+                dealData.setTime(System.currentTimeMillis());
+                amqpTemplate.convertAndSend(QueueEnum.DEAL_QUEUE, dealData);
+            }
         }
-        //todo
-//        if (!Objects.equals(productVo.getReportProtocol(), in.getReportProtocol())) {
-//            log.warn("产品:{} 设备:{} 上报协议未匹配", in.getCode(), in.getDeviceSn());
-//            return;
-//        }
-        DealData dealData = BeanUtil.toBean(productVo, DealData.class);
-        dealData.setDeviceSn(in.getDeviceSn());
-        dealData.setData(in.getData());
-        dealData.setTime(System.currentTimeMillis());
-        amqpTemplate.convertAndSend(QueueEnum.DEAL_QUEUE.getName(), dealData);
     }
 
 
-    @RabbitListener(queues = "dealQueue", concurrency = "8-10")
-    public void dealQueue1(@Payload DealData in) {
+    @RabbitListener(queues = QueueEnum.DEAL_QUEUE, concurrency = "6-14")
+    public void dealQueue(DealData in) {
         String msgId = IdUtil.fastSimpleUUID();
         cacheService.setKey(String.format(CacheConstant.ONLINE_CACHE, in.getGuid()), LocalDateTime.now());
         //1.云函数解析
         Object cloudData = productAnalyse.dataConvert(in);
-//        Object cloudData = in.getData();
         OriginalPara originalPara = new OriginalPara();
         originalPara.setCode(in.getCode());
         originalPara.setGuid(in.getGuid());
@@ -97,22 +103,24 @@ public class QueueService {
         DbData dbData = new DbData();
         dbData.setOriginalPara(originalPara);
         dbData.setPropertyData(propertyData);
-
-        amqpTemplate.convertAndSend(QueueEnum.DB_QUEUE.getName(), dbData);
+        amqpTemplate.convertAndSend(QueueEnum.DB_QUEUE, dbData);
     }
 
 
-    @RabbitListener(queues = "dbQueue", concurrency = "3-5")
-    public void dbQueue1(@Payload DbData in) {
-        OriginalPara originalPara = in.getOriginalPara();
-        PropertyData propertyData = in.getPropertyData();
-
-        taosService.addOriginal(originalPara);
-        if (Objects.nonNull(propertyData)) {
-            for (ProductPara datum : propertyData.getData()) {
-                datum.setReserve(String.valueOf(System.currentTimeMillis() - Long.parseLong(datum.getReserve())));
+    @RabbitListener(queues = QueueEnum.DB_QUEUE, concurrency = "4-10", containerFactory = "rabbitFactory")
+    public void dbQueue(List<DbData> list) {
+        List<OriginalPara> originalParas = list.stream().map(DbData::getOriginalPara).collect(Collectors.toList());
+        taosService.batchAddOriginal(originalParas);
+        List<PropertyData> collect = list.stream().map(t -> {
+            PropertyData propertyData = t.getPropertyData();
+            if (Objects.nonNull(propertyData)) {
+                for (ProductPara datum : propertyData.getData()) {
+                    datum.setReserve(String.valueOf(System.currentTimeMillis() - Long.parseLong(datum.getReserve())));
+                }
+                taosService.addDeviceData(propertyData);
             }
-            taosService.addDeviceData(propertyData);
-        }
+            return propertyData;
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+        taosService.batchAddDeviceData(collect);
     }
 }