Sfoglia il codice sorgente

引入rabbitmq,修改逻辑

xucaiqin 1 anno fa
parent
commit
7f3b1e02ea
18 ha cambiato i file con 451 aggiunte e 127 eliminazioni
  1. 9 7
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/mapper/TaosMapper.java
  2. 5 1
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/OriginalPara.java
  3. 8 4
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/ProductPara.java
  4. 22 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/PropertyData.java
  5. 23 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/QueueEnum.java
  6. 24 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/data/DbData.java
  7. 40 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/data/DealData.java
  8. 29 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/data/RawData.java
  9. 73 61
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/DataAnalyseService.java
  10. 9 10
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/ProductAnalyse.java
  11. 26 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/TaosService.java
  12. 5 18
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/http/HttpService.java
  13. 1 22
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/mqtt/strategy/AttrReportStrategy.java
  14. 28 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/MqQueue.java
  15. 118 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/QueueService.java
  16. 4 0
      iot-module/iot-module-data/iot-module-data-biz/src/main/resources/bootstrap-stress.yaml
  17. 20 4
      iot-module/iot-module-data/iot-module-data-biz/src/main/resources/logback-spring.xml
  18. 7 0
      iot-module/iot-module-manage/iot-module-manage-api/src/main/java/com/middle/platform/manage/api/pojo/ProductVo.java

+ 9 - 7
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/mapper/TaosMapper.java

@@ -3,6 +3,7 @@ package com.middle.platform.data.biz.mapper;
 import com.baomidou.dynamic.datasource.annotation.DS;
 import com.middle.platform.data.biz.pojo.OriginalPara;
 import com.middle.platform.data.biz.pojo.ProductPara;
+import com.middle.platform.data.biz.pojo.PropertyData;
 import com.middle.platform.data.biz.pojo.PropertyVo;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
@@ -39,15 +40,16 @@ public interface TaosMapper {
      *
      * @param productPara
      */
-    void insert(ProductPara productPara);
+    void insertDevice(ProductPara productPara);
 
     /**
      * 批量写入设备表数据
      *
-     * @param collect
+     * @param data
      * @return
      */
-    int batchInsert(@Param("list") List<ProductPara> collect, @Param("guid") String guid, @Param("code") String code);
+    int batchDeviceInsert( @Param("data") PropertyData data);
+    int batchDeviceInsertMore(@Param("list") List<PropertyData> list);
 
 
     /*原始数据*/
@@ -77,16 +79,16 @@ public interface TaosMapper {
     int insertOriginal(OriginalPara originalPara);
 
     /**
-     * 批量写入设备表数据
+     * 批量写入原始表数据
      *
-     * @param collect
+     * @param list
      * @return
      */
-    int batchInsertOriginal(@Param("list") List<OriginalPara> collect, @Param("guid") String guid, @Param("code") String code);
+    int batchInsertOriginal(@Param("list") List<OriginalPara> list);
 
 
     /*查询设备数据*/
-    List<PropertyVo> queryRaw(@Param("code") String code, @Param("guid") String guid,  @Param("startTime") Date time, @Param("endTime") Date endTime);
+    List<PropertyVo> queryRaw(@Param("code") String code, @Param("guid") String guid, @Param("startTime") Date time, @Param("endTime") Date endTime);
 
     List<PropertyVo> query(@Param("code") String code, @Param("guid") String guid, @Param("line") String line, @Param("startTime") Date time, @Param("endTime") Date endTime);
 

+ 5 - 1
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/OriginalPara.java

@@ -3,6 +3,8 @@ package com.middle.platform.data.biz.pojo;
 import lombok.Getter;
 import lombok.Setter;
 
+import java.io.Serial;
+import java.io.Serializable;
 import java.util.Date;
 
 /**
@@ -11,7 +13,9 @@ import java.util.Date;
  */
 @Getter
 @Setter
-public class OriginalPara {
+public class OriginalPara implements Serializable {
+    @Serial
+    private static final long serialVersionUID = 902275090169499401L;
     private String code;
     private String guid;
     /**

+ 8 - 4
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/ProductPara.java

@@ -3,6 +3,8 @@ package com.middle.platform.data.biz.pojo;
 import lombok.Getter;
 import lombok.Setter;
 
+import java.io.Serial;
+import java.io.Serializable;
 import java.util.Date;
 
 /**
@@ -11,10 +13,12 @@ import java.util.Date;
  */
 @Getter
 @Setter
-public class ProductPara {
-    private String code;
-
-    private String guid;
+public class ProductPara implements Serializable {
+    @Serial
+    private static final long serialVersionUID = 1051383714573387651L;
+//    private String code;
+//
+//    private String guid;
     /**
      * 字段
      */

+ 22 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/PropertyData.java

@@ -0,0 +1,22 @@
+package com.middle.platform.data.biz.pojo;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @author xucaiqin
+ * @date 2023-12-20 09:26:56
+ */
+@Getter
+@Setter
+public class PropertyData implements Serializable {
+    @Serial
+    private static final long serialVersionUID = -1429416702031856607L;
+    //表后缀
+    private String table;
+    private List<ProductPara> data;
+}

+ 23 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/QueueEnum.java

@@ -0,0 +1,23 @@
+package com.middle.platform.data.biz.pojo;
+
+import lombok.Getter;
+
+/**
+ * @author xucaiqin
+ * @date 2024-03-11 09:30:09
+ */
+@Getter
+public enum QueueEnum {
+
+    RAW_QUEUE("rawQueue"),
+    DEAL_QUEUE("dealQueue"),
+    DB_QUEUE("dbQueue"),
+    ;
+
+    private final String name;
+
+
+    QueueEnum(String name) {
+        this.name = name;
+    }
+}

+ 24 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/data/DbData.java

@@ -0,0 +1,24 @@
+package com.middle.platform.data.biz.pojo.data;
+
+import com.middle.platform.data.biz.pojo.OriginalPara;
+import com.middle.platform.data.biz.pojo.PropertyData;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+/**
+ * 准备写入db的队列消息
+ *
+ * @author xucaiqin
+ * @date 2024-03-11 09:06:20
+ */
+@Getter
+@Setter
+public class DbData implements Serializable {
+    @Serial
+    private static final long serialVersionUID = -1094782880847966298L;
+    private OriginalPara originalPara;
+    private PropertyData propertyData;
+}

+ 40 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/data/DealData.java

@@ -0,0 +1,40 @@
+package com.middle.platform.data.biz.pojo.data;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+/**
+ * @author xucaiqin
+ * @date 2024-03-11 09:06:20
+ */
+@Getter
+@Setter
+public class DealData implements Serializable {
+    @Serial
+    private static final long serialVersionUID = -1098782880847966298L;
+    private String code;
+    private String deviceSn;
+    /**
+     * 设备guid
+     */
+    private String guid;
+    /**
+     * 产品id
+     */
+    private Long id;
+    /**
+     * 数据格式 1-json 2-xml 3-base64 4-hex 5-text
+     */
+    private Integer dataFormat;
+    /**
+     * 上报协议 1-mqtt 2-http 3-coap 4-tcp 5-udp
+     */
+    private Integer reportProtocol;
+
+    private Object data;
+    private Long time;
+
+}

+ 29 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/data/RawData.java

@@ -0,0 +1,29 @@
+package com.middle.platform.data.biz.pojo.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.io.Serial;
+import java.io.Serializable;
+
+/**
+ * @author xucaiqin
+ * @date 2024-03-11 09:01:11
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class RawData implements Serializable {
+    @Serial
+    private static final long serialVersionUID = -5026845424562685986L;
+    private String code;
+    private String deviceSn;
+    private Object data;
+    /**
+     * 数据上报方式
+     */
+    private Integer reportProtocol;
+}

+ 73 - 61
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/DataAnalyseService.java

@@ -1,21 +1,15 @@
 package com.middle.platform.data.biz.service;
 
-import cn.hutool.core.collection.CollUtil;
-import cn.hutool.core.date.LocalDateTimeUtil;
 import com.middle.platform.data.biz.mapper.TaosMapper;
-import com.middle.platform.data.biz.pojo.OriginalPara;
-import com.middle.platform.data.biz.pojo.ProductPara;
-import com.middle.platform.data.biz.pojo.PropertyDto;
-import com.middle.platform.data.biz.utils.TsUtil;
 import com.middle.platform.manage.api.pojo.ProductVo;
-import com.middle.platform.redis.constant.CacheConstant;
 import com.middle.platform.redis.service.CacheService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
-import java.time.LocalDateTime;
-import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author xucaiqin
@@ -28,6 +22,10 @@ public class DataAnalyseService {
     private final TaosMapper taosMapper;
     private final CacheService cacheService;
     private final ProductAnalyse productAnalyse;
+    private final TaosService taosService;
+
+    private final ThreadPoolExecutor threadPoolExecutor =
+            new ThreadPoolExecutor(7, 400, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50));
 
     /**
      * 源数据处理
@@ -36,20 +34,27 @@ public class DataAnalyseService {
      * @param payload
      */
     public void rawData(String msgId, ProductVo productVo, Object payload) {
-        cacheService.setKey(String.format(CacheConstant.ONLINE_CACHE, productVo.getGuid()), LocalDateTime.now());
-        Object o = cacheService.getKey(String.format(CacheConstant.TD_ORG_CACHE, productVo.getCode(), productVo.getGuid()));
-        if (Objects.isNull(o)) {
-            taosMapper.createOriginalTable(productVo.getGuid(), productVo.getCode());
-            cacheService.setKey(String.format(CacheConstant.TD_ORG_CACHE, productVo.getCode(), productVo.getGuid()), true);
-        }
-        OriginalPara originalPara = new OriginalPara();
-        originalPara.setCode(productVo.getCode());
-        originalPara.setGuid(productVo.getGuid());
-        originalPara.setRawStr(payload.toString());
-        originalPara.setTs(new Date());
-        originalPara.setMsgId(msgId);
-
-        taosMapper.insertOriginal(originalPara);
+//        threadPoolExecutor.execute(() -> {
+////            StopWatch stopWatch = new StopWatch();
+////            stopWatch.start("源数据表判断");
+//            cacheService.setKey(String.format(CacheConstant.ONLINE_CACHE, productVo.getGuid()), LocalDateTime.now());
+//            Object o = cacheService.getKey(String.format(CacheConstant.TD_ORG_CACHE, productVo.getCode(), productVo.getGuid()));
+//            if (Objects.isNull(o)) {
+//                taosMapper.createOriginalTable(productVo.getGuid(), productVo.getCode());
+//                cacheService.setKey(String.format(CacheConstant.TD_ORG_CACHE, productVo.getCode(), productVo.getGuid()), true);
+//            }
+////            stopWatch.stop();
+////            stopWatch.start("源数据表写入");
+//            OriginalPara originalPara = new OriginalPara();
+//            originalPara.setCode(productVo.getCode());
+//            originalPara.setGuid(productVo.getGuid());
+//            originalPara.setRawStr(payload.toString());
+//            originalPara.setTs(new Date());
+//            originalPara.setMsgId(msgId);
+//            taosService.addRaw(originalPara);
+////            stopWatch.stop();
+////            log.info("源数据耗时:{}", stopWatch.prettyPrint());
+//        });
     }
 
     /**
@@ -59,45 +64,52 @@ public class DataAnalyseService {
      * @param productVo
      */
     public void analyzeData(String msgId, ProductVo productVo, Object data) {
-        log.info("源数据:{}", data);
-        //1.云函数解析
-        Object cloudData = productAnalyse.dataConvert(productVo, data);
-        log.info("云函数解析:{}", cloudData);
-        //2.物模型解析
-        Map<String, PropertyDto> modData = productAnalyse.modConvert(productVo, cloudData);
-        log.info("物模型解析:{}", modData);
-        //判断是否创建子表
-        tableCheck(productVo);
-        //数据写入
-        ProductPara productPara;
-        List<ProductPara> list = new ArrayList<>();
-        if(CollUtil.isNotEmpty(modData)){
-            for (Map.Entry<String, PropertyDto> map : modData.entrySet()) {
-                map.getValue().setTime(LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
-                productPara = new ProductPara();
-                productPara.setCode(productVo.getCode());
-                productPara.setGuid(productVo.getGuid());
-                productPara.setLine(map.getKey());
-                productPara.setReserve("");
-                productPara.setVal(map.getValue().getVal());
-                productPara.setMsgId(msgId);
-                productPara.setTs(TsUtil.getAndIncrement());
-                productPara.setOriginTime(new Date());//todo 有则获取
-                list.add(productPara);
-                cacheService.setHashKey(String.format(CacheConstant.D_DATA_CACHE, productVo.getGuid()), map.getKey(), map.getValue());
-                if (CollUtil.isNotEmpty(list)) {
-                    taosMapper.batchInsert(list, productVo.getGuid(), productVo.getCode());
-                }
-            }
-        }
+//        log.info("源数据:{}", data);
+//        threadPoolExecutor.execute(() -> {
+////            StopWatch stopWatch = new StopWatch();
+////            stopWatch.start("云函数解析");
+//            //1.云函数解析
+////            Object cloudData = productAnalyse.dataConvert(productVo);
+////            log.info("云函数解析:{}", cloudData);
+//            //stopWatch.stop();
+////            2.物模型解析
+////            stopWatch.start("物模型解析");
+////            Map<String, PropertyDto> modData = productAnalyse.modConvert(productVo, data);
+////            log.info("物模型解析:{}", modData);
+////            stopWatch.stop();
+//            //判断是否创建子表
+////            stopWatch.start("子表判断");
+//            tableCheck(productVo);
+////            stopWatch.stop();
+//            //数据写入
+////            stopWatch.start("数据写入");
+//            PropertyData propertyData = new PropertyData();
+//            ProductPara productPara;
+//            List<ProductPara> list = new ArrayList<>();
+//            if (CollUtil.isNotEmpty(modData)) {
+//                for (Map.Entry<String, PropertyDto> map : modData.entrySet()) {
+//                    map.getValue().setTime(LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
+//                    productPara = new ProductPara();
+//                    propertyData.setTable(productVo.getCode() + "_" + productVo.getGuid());
+////                    productPara.setCode(productVo.getCode());
+////                    productPara.setGuid(productVo.getGuid());
+//                    productPara.setLine(map.getKey());
+//                    productPara.setReserve("");
+//                    productPara.setVal(map.getValue().getVal());
+//                    productPara.setMsgId(msgId);
+//                    productPara.setTs(TsUtil.getAndIncrement());
+//                    productPara.setOriginTime(new Date());//todo 有则获取
+//                    list.add(productPara);
+//                    cacheService.setHashKey(String.format(CacheConstant.D_DATA_CACHE, productVo.getGuid()), map.getKey(), map.getValue());
+//                }
+//                propertyData.setData(list);
+//                taosService.addData(propertyData);
+//            }
+////            stopWatch.stop();
+////            log.info("设备数据:{}", stopWatch.prettyPrint());
+//        });
     }
 
-    private void tableCheck(ProductVo productVo) {
-        Object o = cacheService.getKey(String.format(CacheConstant.TD_CACHE, productVo.getCode(), productVo.getGuid()));
-        if (Objects.isNull(o)) {
-            taosMapper.createDeviceTable(productVo.getGuid(), productVo.getCode());
-            cacheService.setKey(String.format(CacheConstant.TD_CACHE, productVo.getCode(), productVo.getGuid()), true);
-        }
-    }
+
 
 }

+ 9 - 10
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/ProductAnalyse.java

@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.middle.platform.common.utils.OkHttpUtils;
 import com.middle.platform.data.biz.pojo.CloudDto;
 import com.middle.platform.data.biz.pojo.PropertyDto;
+import com.middle.platform.data.biz.pojo.data.DealData;
 import com.middle.platform.data.biz.pojo.mod.ModDto;
 import com.middle.platform.data.biz.pojo.mod.Property;
 import com.middle.platform.manage.api.enums.DataFormatConstant;
@@ -67,20 +68,19 @@ public class ProductAnalyse {
      * 云函数转换数据
      *
      * @param productVo 产品
-     * @param payload   上报数据
      * @return 云函数转换后的数据
      */
-    public Object dataConvert(ProductVo productVo, Object payload) {
+    public Object dataConvert(DealData productVo) {
         IotCloudVo iotCloudVo = cloudApi.queryCloud(productVo.getId());
         if (Objects.isNull(iotCloudVo)) {
             log.warn("未找到云函数");
-            return objStrToJSON(payload);
+            return objStrToJSON(productVo.getData());
         }
         //通过云函数转换
-        if (productVo.getDataFormat().equals(DataFormatConstant.json)) {
+        if (Objects.equals(DataFormatConstant.json, productVo.getDataFormat())) {
             try {
                 JSONArray objects = new JSONArray();
-                objects.add(objStrToJSON(payload));//保证传递过去的数据类型是json而非json字符串
+                objects.add(objStrToJSON(productVo.getData()));//保证传递过去的数据类型是json而非json字符串
                 String sync = OkHttpUtils.builder().url(cloud + "/cloud/" + productVo.getId() + ".js").postRawJson(objects).sync();
                 CloudDto cloudDto = JSONObject.parseObject(sync, CloudDto.class);
                 if (Objects.nonNull(cloudDto) && cloudDto.getStatus()) {
@@ -90,18 +90,17 @@ public class ProductAnalyse {
                 log.error("云函数请求异常:{}", e.getMessage(), e);
             }
         }
-        return objStrToJSON(payload);
+        return objStrToJSON(productVo.getData());
     }
 
     /**
      * 物模型转换
      *
-     * @param payload 云函数解析后的数据
      * @return 解析为物模型对应属性数据
      */
-    public Map<String, PropertyDto> modConvert(ProductVo productVo, Object payload) {
-        log.info("物模型数据转换:{}", payload);
-        String mod = modApi.queryMod(productVo.getId());
+    public Map<String, PropertyDto> modConvert(DealData dealData) {
+        Object payload = dealData.getData();
+        String mod = modApi.queryMod(dealData.getId());
         if (StringUtils.isNotBlank(mod)) {
             ModDto modDto = JSONObject.parseObject(mod, ModDto.class);
             return Optional.ofNullable(modDto)

+ 26 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/TaosService.java

@@ -0,0 +1,26 @@
+package com.middle.platform.data.biz.service;
+
+import com.middle.platform.data.biz.mapper.TaosMapper;
+import com.middle.platform.data.biz.pojo.OriginalPara;
+import com.middle.platform.data.biz.pojo.PropertyData;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author xucaiqin
+ * @date 2023-12-26 10:14:27
+ */
+@Service
+@RequiredArgsConstructor
+public class TaosService {
+    private final TaosMapper taosMapper;
+
+    public void addOriginal(OriginalPara originalPara) {
+        taosMapper.insertOriginal(originalPara);
+    }
+
+    public void addDeviceData(PropertyData originalPara) {
+        taosMapper.batchDeviceInsert(originalPara);
+    }
+
+}

+ 5 - 18
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/http/HttpService.java

@@ -1,15 +1,12 @@
 package com.middle.platform.data.biz.service.http;
 
-import cn.hutool.core.util.IdUtil;
-import com.middle.platform.data.biz.service.DataAnalyseService;
-import com.middle.platform.data.biz.service.ProductAnalyse;
-import com.middle.platform.manage.api.pojo.ProductVo;
+import com.middle.platform.data.biz.pojo.QueueEnum;
+import com.middle.platform.data.biz.pojo.data.RawData;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.stereotype.Component;
 
-import java.util.Objects;
-
 /**
  * @author xucaiqin
  * @date 2023-12-25 17:26:23
@@ -18,8 +15,7 @@ import java.util.Objects;
 @RequiredArgsConstructor
 @Slf4j
 public class HttpService {
-    private final ProductAnalyse productAnalyse;
-    private final DataAnalyseService dataAnalyseService;
+    private final AmqpTemplate amqpTemplate;
 
     /**
      * 数据上报
@@ -29,15 +25,6 @@ public class HttpService {
      * @param data
      */
     public void dataReport(String code, String deviceSn, Object data) {
-        ProductVo productVo = productAnalyse.getProduct(code, deviceSn);
-        if (Objects.isNull(productVo)) {
-            log.warn("产品:{} 设备:{} 不存在", code, deviceSn);
-            return;
-        }
-        String msgId = IdUtil.fastSimpleUUID();
-        //1.源数据处理
-        dataAnalyseService.rawData(msgId, productVo, data);
-        //2.存储解析后的数据
-        dataAnalyseService.analyzeData(msgId, productVo, data);
+        amqpTemplate.convertAndSend(QueueEnum.RAW_QUEUE.getName(), new RawData(code, deviceSn, data, 2));
     }
 }

+ 1 - 22
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/mqtt/strategy/AttrReportStrategy.java

@@ -1,17 +1,10 @@
 package com.middle.platform.data.biz.service.mqtt.strategy;
 
-import cn.hutool.core.util.IdUtil;
-import com.middle.platform.data.biz.service.DataAnalyseService;
-import com.middle.platform.data.biz.service.ProductAnalyse;
 import com.middle.platform.data.biz.service.mqtt.MqttTopicStrategy;
-import com.middle.platform.data.biz.utils.MqttTopicUtil;
-import com.middle.platform.manage.api.pojo.ProductVo;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
-import java.util.Objects;
-
 /**
  * 属性上报
  */
@@ -19,24 +12,10 @@ import java.util.Objects;
 @Slf4j
 @RequiredArgsConstructor
 public class AttrReportStrategy implements MqttTopicStrategy {
-    private final ProductAnalyse productAnalyse;
-    private final DataAnalyseService dataAnalyseService;
 
     @Override
     public void dealMsg(String topic, Object msg) {
         log.info("属性[{}]上报数据:{}", topic, msg);
-        String deviceSn = MqttTopicUtil.getDeviceSn(topic);
-        String productKey = MqttTopicUtil.getProductKey(topic);
-        //获取产品信息
-        ProductVo productVo = productAnalyse.getProduct(productKey, deviceSn);
-        if (Objects.isNull(productVo)) {
-            log.warn("产品:{} 设备:{} 不存在", productKey, deviceSn);
-            return;
-        }
-        String msgId = IdUtil.fastSimpleUUID();
-        //1.源数据处理
-        dataAnalyseService.rawData(msgId, productVo, msg);
-        //2.存储解析后的数据
-        dataAnalyseService.analyzeData(msgId, productVo, msg);
+
     }
 }

+ 28 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/MqQueue.java

@@ -0,0 +1,28 @@
+package com.middle.platform.data.biz.service.rabbitmq;
+
+import com.middle.platform.data.biz.pojo.QueueEnum;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xucaiqin
+ * @date 2024-03-08 16:52:42
+ */
+@Component
+public class MqQueue {
+    @Bean
+    public Queue rawQueue() {
+        return new Queue(QueueEnum.RAW_QUEUE.getName());
+    }
+
+    @Bean
+    public Queue dealQueue() {
+        return new Queue(QueueEnum.DEAL_QUEUE.getName());
+    }
+
+    @Bean
+    public Queue dbQueue() {
+        return new Queue(QueueEnum.DB_QUEUE.getName());
+    }
+}

+ 118 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/rabbitmq/QueueService.java

@@ -0,0 +1,118 @@
+package com.middle.platform.data.biz.service.rabbitmq;
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.date.LocalDateTimeUtil;
+import cn.hutool.core.util.IdUtil;
+import com.middle.platform.data.biz.pojo.*;
+import com.middle.platform.data.biz.pojo.data.DbData;
+import com.middle.platform.data.biz.pojo.data.DealData;
+import com.middle.platform.data.biz.pojo.data.RawData;
+import com.middle.platform.data.biz.service.ProductAnalyse;
+import com.middle.platform.data.biz.service.TaosService;
+import com.middle.platform.data.biz.utils.TsUtil;
+import com.middle.platform.manage.api.pojo.ProductVo;
+import com.middle.platform.redis.constant.CacheConstant;
+import com.middle.platform.redis.service.CacheService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.*;
+
+/**
+ * @author xucaiqin
+ * @date 2024-03-08 17:35:57
+ */
+@Service
+@AllArgsConstructor
+@Slf4j
+public class QueueService {
+    private final ProductAnalyse productAnalyse;
+    private final AmqpTemplate amqpTemplate;
+    private final CacheService cacheService;
+    private final TaosService taosService;
+
+
+    @RabbitListener(queues = "rawQueue", concurrency = "6-8")
+    public void rawQueue1(@Payload RawData in) {
+        ProductVo productVo = productAnalyse.getProduct(in.getCode(), in.getDeviceSn());
+        if (Objects.isNull(productVo)) {
+            log.warn("产品:{} 设备:{} 不存在", in.getCode(), in.getDeviceSn());
+            return;
+        }
+        //todo
+//        if (!Objects.equals(productVo.getReportProtocol(), in.getReportProtocol())) {
+//            log.warn("产品:{} 设备:{} 上报协议未匹配", in.getCode(), in.getDeviceSn());
+//            return;
+//        }
+        DealData dealData = BeanUtil.toBean(productVo, DealData.class);
+        dealData.setDeviceSn(in.getDeviceSn());
+        dealData.setData(in.getData());
+        dealData.setTime(System.currentTimeMillis());
+        amqpTemplate.convertAndSend(QueueEnum.DEAL_QUEUE.getName(), dealData);
+    }
+
+
+    @RabbitListener(queues = "dealQueue", concurrency = "8-10")
+    public void dealQueue1(@Payload DealData in) {
+        String msgId = IdUtil.fastSimpleUUID();
+        cacheService.setKey(String.format(CacheConstant.ONLINE_CACHE, in.getGuid()), LocalDateTime.now());
+        //1.云函数解析
+        Object cloudData = productAnalyse.dataConvert(in);
+//        Object cloudData = in.getData();
+        OriginalPara originalPara = new OriginalPara();
+        originalPara.setCode(in.getCode());
+        originalPara.setGuid(in.getGuid());
+        originalPara.setRawStr(in.getData().toString());
+        originalPara.setTs(new Date());
+        originalPara.setMsgId(msgId);
+        in.setData(cloudData);
+        // 2.物模型解析
+        Map<String, PropertyDto> modData = productAnalyse.modConvert(in);
+        List<ProductPara> list = new ArrayList<>();
+        PropertyData propertyData = null;
+        if (CollUtil.isNotEmpty(modData)) {
+            ProductPara productPara;
+            propertyData = new PropertyData();
+            for (Map.Entry<String, PropertyDto> map : modData.entrySet()) {
+                map.getValue().setTime(LocalDateTimeUtil.formatNormal(LocalDateTime.now()));
+                productPara = new ProductPara();
+                propertyData.setTable(in.getCode() + "_" + in.getGuid());
+                productPara.setLine(map.getKey());
+                productPara.setReserve(String.valueOf(in.getTime()));
+                productPara.setVal(map.getValue().getVal());
+                productPara.setMsgId(msgId);
+                productPara.setTs(TsUtil.getAndIncrement());
+                productPara.setOriginTime(new Date());//todo 有则获取
+                list.add(productPara);
+                cacheService.setHashKey(String.format(CacheConstant.D_DATA_CACHE, in.getGuid()), map.getKey(), map.getValue());
+            }
+            propertyData.setData(list);
+        }
+        DbData dbData = new DbData();
+        dbData.setOriginalPara(originalPara);
+        dbData.setPropertyData(propertyData);
+
+        amqpTemplate.convertAndSend(QueueEnum.DB_QUEUE.getName(), dbData);
+    }
+
+
+    @RabbitListener(queues = "dbQueue", concurrency = "3-5")
+    public void dbQueue1(@Payload DbData in) {
+        OriginalPara originalPara = in.getOriginalPara();
+        PropertyData propertyData = in.getPropertyData();
+
+        taosService.addOriginal(originalPara);
+        if (Objects.nonNull(propertyData)) {
+            for (ProductPara datum : propertyData.getData()) {
+                datum.setReserve(String.valueOf(System.currentTimeMillis() - Long.parseLong(datum.getReserve())));
+            }
+            taosService.addDeviceData(propertyData);
+        }
+    }
+}

+ 4 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/resources/bootstrap-stress.yaml

@@ -14,3 +14,7 @@ spring:
           - data-id: common.yaml
             group: common
             refresh: true
+  rabbitmq:
+    host: 10.10.10.120
+    username: guest
+    password: guest

+ 20 - 4
iot-module/iot-module-data/iot-module-data-biz/src/main/resources/logback-spring.xml

@@ -13,7 +13,7 @@
     <property name="log.path" value="logs"/>
     <property name="LOG_HOME" value="iot-data"/>
 
-<!--    <springProperty scope="context" name="LOG_HOME" source="spring.application.name" defaultValue="iot-data"/>-->
+    <!--    <springProperty scope="context" name="LOG_HOME" source="spring.application.name" defaultValue="iot-data"/>-->
     <!--0. 日志格式和颜色渲染 -->
     <!-- 彩色日志依赖的渲染类 -->
     <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
@@ -146,7 +146,6 @@
     </appender>
 
 
-
     <!-- 4. 最终的策略 -->
     <root level="info">
         <appender-ref ref="CONSOLE"/>
@@ -155,7 +154,15 @@
         <appender-ref ref="WARN_FILE"/>
         <appender-ref ref="ERROR_FILE"/>
     </root>
-
+    <springProfile name="local">
+        <root level="info">
+            <appender-ref ref="CONSOLE"/>
+            <appender-ref ref="DEBUG_FILE"/>
+            <appender-ref ref="INFO_FILE"/>
+            <appender-ref ref="WARN_FILE"/>
+            <appender-ref ref="ERROR_FILE"/>
+        </root>
+    </springProfile>
     <!-- 4.1 开发环境:打印控制台-->
     <springProfile name="dev">
         <root level="info">
@@ -186,5 +193,14 @@
             <appender-ref ref="WARN_FILE"/>
         </root>
     </springProfile>
-
+    <!--  压测环境-->
+    <springProfile name="stress">
+        <root level="info">
+            <appender-ref ref="CONSOLE"/>
+            <appender-ref ref="DEBUG_FILE"/>
+            <appender-ref ref="INFO_FILE"/>
+            <appender-ref ref="ERROR_FILE"/>
+            <appender-ref ref="WARN_FILE"/>
+        </root>
+    </springProfile>
 </configuration>

+ 7 - 0
iot-module/iot-module-manage/iot-module-manage-api/src/main/java/com/middle/platform/manage/api/pojo/ProductVo.java

@@ -27,5 +27,12 @@ public class ProductVo implements Serializable {
      * 设备guid
      */
     private String guid;
+    /**
+     * 数据格式 1-json 2-xml 3-base64 4-hex 5-text
+     */
     private Integer dataFormat;
+    /**
+     * 上报协议 1-mqtt 2-http 3-coap 4-tcp 5-udp
+     */
+    private Integer reportProtocol;
 }