xucaiqin преди 1 година
родител
ревизия
3f1f06eb8e

+ 13 - 2
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/mqtt/strategy/AttrReportStrategy.java

@@ -1,10 +1,18 @@
 package com.middle.platform.data.biz.service.mqtt.strategy;
 
+import com.middle.platform.data.biz.pojo.QueueEnum;
+import com.middle.platform.data.biz.pojo.data.RawData;
 import com.middle.platform.data.biz.service.mqtt.MqttTopicStrategy;
+import com.middle.platform.data.biz.utils.MqttTopicUtil;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.stereotype.Service;
 
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * 属性上报
  */
@@ -12,10 +20,13 @@ import org.springframework.stereotype.Service;
 @Slf4j
 @RequiredArgsConstructor
 public class AttrReportStrategy implements MqttTopicStrategy {
+    private final AmqpTemplate amqpTemplate;
+    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1500));
 
     @Override
     public void dealMsg(String topic, Object msg) {
-        log.info("属性[{}]上报数据:{}", topic, msg);
-
+        String deviceSn = MqttTopicUtil.getDeviceSn(topic);
+        String productKey = MqttTopicUtil.getProductKey(topic);
+        threadPoolExecutor.execute(() -> amqpTemplate.convertAndSend(QueueEnum.RAW_QUEUE, new RawData(productKey, deviceSn, msg, 2)));
     }
 }