Browse Source

对接mqtt数据配置bugfix

lengfaqiang 2 years ago
parent
commit
39770d4462

+ 39 - 39
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/common/config/mqtt/MqSendMessageGateWay.java

@@ -1,39 +1,39 @@
-//package com.sckw.slope.detection.common.config.mqtt;
-//
-//import org.springframework.integration.annotation.MessagingGateway;
-//import org.springframework.integration.mqtt.support.MqttHeaders;
-//import org.springframework.messaging.handler.annotation.Header;
-//import org.springframework.stereotype.Component;
-//
-///**
-// * @author lfdc
-// * @description
-// * @date 2023-10-26 11:10:01
-// */
-//@Component
-//@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
-//public interface MqSendMessageGateWay {
-//    /**
-//     * 默认的消息机制
-//     *
-//     * @param data
-//     */
-//    void sendToMqtt(String data);
-//
-//    /**
-//     * 发送消息 向mqtt指定topic发送消息
-//     *
-//     * @param topic
-//     * @param payload
-//     */
-//    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
-//
-//    /**
-//     * 发送消息 向mqtt指定topic发送消息
-//     *
-//     * @param topic 主题
-//     * @param qos   机制
-//     * @param payload   消息
-//     */
-//    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
-//}
+package com.sckw.slope.detection.common.config.mqtt;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author lfdc
+ * @description
+ * @date 2023-10-26 11:10:01
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
+public interface MqSendMessageGateWay {
+    /**
+     * 默认的消息机制
+     *
+     * @param data
+     */
+    void sendToMqtt(String data);
+
+    /**
+     * 发送消息 向mqtt指定topic发送消息
+     *
+     * @param topic
+     * @param payload
+     */
+    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+    /**
+     * 发送消息 向mqtt指定topic发送消息
+     *
+     * @param topic 主题
+     * @param qos   机制
+     * @param payload   消息
+     */
+    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+}

+ 211 - 211
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/common/config/mqtt/MqttConfig.java

@@ -1,211 +1,211 @@
-//package com.sckw.slope.detection.common.config.mqtt;
-//
-//import com.sckw.slope.detection.consumer.MqttCallbackHandler;
-//import com.sckw.slope.detection.consumer.mqApi.MqttApiHandler;
-//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.integration.annotation.ServiceActivator;
-//import org.springframework.integration.channel.DirectChannel;
-//import org.springframework.integration.core.MessageProducer;
-//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
-//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
-//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
-//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
-//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
-//import org.springframework.messaging.MessageChannel;
-//import org.springframework.messaging.MessageHandler;
-//
-///**
-// * @author lfdc
-// * @description MQTT配置,生产者
-// * @date 2023-10-26 10:10:59
-// */
-//@Configuration
-//public class MqttConfig {
-//
-//    private static final byte[] WILL_DATA;
-//
-//    static {
-//        WILL_DATA = "offline".getBytes();
-//    }
-//
-//    /**
-//     * mqtt订阅者使用信道名称
-//     */
-//    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
-//    /**
-//     * mqtt发布者信道名称
-//     */
-//    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
-//    /**
-//     * mqtt发送者用户名
-//     */
-//    @Value("${mqtt.send.username}")
-//    private String username;
-//
-//    /**
-//     * mqtt发送者密码
-//     */
-//    @Value("${mqtt.send.password}")
-//    private String password;
-//
-//    /**
-//     * mqtt发送者url
-//     */
-//    @Value("${mqtt.send.url}")
-//    private String hostUrl;
-//    /**
-//     * mqtt发送者客户端id
-//     */
-//    @Value("${mqtt.send.clientId}")
-//    private String clientId;
-//    /**
-//     * mqtt发送者主题
-//     */
-//    @Value("${mqtt.send.topic}")
-//    private String msgTopic;
-//
-//    /**
-//     * mqtt发送者主题
-//     */
-//    @Value("${mqtt.receive.topic}")
-//    private String msgReceiveTopic;
-//
-//    /**
-//     * mqtt发送者超时时间
-//     */
-//    @Value("${mqtt.send.completionTimeout}")
-//    private int completionTimeout;
-//
-//    @Value("${mqtt.send.keepAliveInterval}")
-//    private int keepAliveInterval;
-//
-//    @Value("${mqtt.send.connectionTimeout}")
-//    private int connectionTimeout;
-//
-//    @Autowired
-//    private MqttApiHandler mqttApiHandler;
-//
-//
-//    /**
-//     * 新建MqttConnectionOptionsBean  MQTT连接器选项
-//     * @return
-//     */
-//    @Bean
-//    public MqttConnectOptions getSenderMqttConnectOptions() {
-//        MqttConnectOptions options = new MqttConnectOptions();
-//        // 设置连接的用户名
-//        if (!username.trim().equals("")) {
-//            //将用户名去掉前后空格
-//            options.setUserName(username);
-//        }
-//        // 设置连接的密码
-//        options.setPassword(password.toCharArray());
-//        // 转化连接的url地址
-//        String[] uris = {hostUrl};
-//        // 设置连接的地址
-//        options.setServerURIs(uris);
-//        // 设置超时时间 单位为秒
-//        options.setConnectionTimeout(completionTimeout);
-//        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
-//        // 但这个方法并没有重连的机制
-//        options.setKeepAliveInterval(keepAliveInterval);
-//        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
-//        //设置超时时间
-//        options.setConnectionTimeout(connectionTimeout);
-//        options.setCleanSession(true);
-//        options.setAutomaticReconnect(true);
-//        return options;
-//    }
-//
-//    /**
-//     * 创建MqttPathClientFactoryBean
-//     */
-//    @Bean
-//    public MqttPahoClientFactory senderMqttClientFactory() {
-//        //创建mqtt客户端工厂
-//        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-//        //设置mqtt的连接设置
-//        factory.setConnectionOptions(getSenderMqttConnectOptions());
-//        return factory;
-//    }
-//
-//    /**
-//     * 发布者-MQTT信息通道(生产者)
-//     */
-//    @Bean(name = CHANNEL_NAME_OUT)
-//    public MessageChannel mqttOutboundChannel() {
-//        return new DirectChannel();
-//    }
-//
-//
-//    /**
-//     * 发布者-MQTT消息处理器(生产者)  将channel绑定到MqttClientFactory上
-//     *
-//     * @return {@link MessageHandler}
-//     */
-//    @Bean
-//    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-//    public MessageHandler mqttOutbound() {
-//        //创建消息处理器
-//        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
-//                clientId + "_pub",
-//                senderMqttClientFactory());
-//        //设置消息处理类型为异步
-//        messageHandler.setAsync(true);
-//        //设置消息的默认主题
-//        messageHandler.setDefaultTopic(msgTopic);
-//        messageHandler.setDefaultRetained(false);
-//        //1.重新连接MQTT服务时,不需要接收该主题最新消息,设置retained为false;
-//        //2.重新连接MQTT服务时,需要接收该主题最新消息,设置retained为true;
-//        return messageHandler;
-//    }
-//
-//
-//    /************                             消费者,订阅者的消费信息                               *****/
-//
-//    /**
-//     * MQTT信息通道(消费者)
-//     */
-//    @Bean(name = CHANNEL_NAME_IN)
-//    public MessageChannel mqttInboundChannel() {
-//        return new DirectChannel();
-//    }
-//
-//    /**
-//     * MQTT消息订阅绑定(消费者)
-//     */
-//    @Bean
-//    public MessageProducer inbound() {
-////        System.out.println("topics:" + msgTopic);
-//        // 可以同时消费(订阅)多个Topic
-//        MqttPahoMessageDrivenChannelAdapter adapter =
-//                new MqttPahoMessageDrivenChannelAdapter(
-//                        clientId + "_sub", senderMqttClientFactory(), msgReceiveTopic.split(","));
-//        adapter.setCompletionTimeout(5000);
-//        adapter.setConverter(new DefaultPahoMessageConverter());
-//        adapter.setQos(0);
-//        // 设置订阅通道
-//        adapter.setOutputChannel(mqttInboundChannel());
-//        return adapter;
-//    }
-//
-//    @Autowired
-//    private MqttCallbackHandler mqttCallbackHandler;
-//    /**
-//     * MQTT消息处理器(消费者)
-//     */
-//    @Bean
-//    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-//    public MessageHandler handler() {
-//        return message -> {
-//            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
-//            String payload = message.getPayload().toString();
-////            mqttCallbackHandler.handle(topic, payload);
-//            mqttApiHandler.handle(topic, payload);
-//        };
-//    }
-//}
+package com.sckw.slope.detection.common.config.mqtt;
+
+import com.sckw.slope.detection.consumer.MqttCallbackHandler;
+import com.sckw.slope.detection.consumer.mqApi.MqttApiHandler;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * @author lfdc
+ * @description MQTT配置,生产者
+ * @date 2023-10-26 10:10:59
+ */
+@Configuration
+public class MqttConfig {
+
+    private static final byte[] WILL_DATA;
+
+    static {
+        WILL_DATA = "offline".getBytes();
+    }
+
+    /**
+     * mqtt订阅者使用信道名称
+     */
+    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
+    /**
+     * mqtt发布者信道名称
+     */
+    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
+    /**
+     * mqtt发送者用户名
+     */
+    @Value("${mqtt.send.username}")
+    private String username;
+
+    /**
+     * mqtt发送者密码
+     */
+    @Value("${mqtt.send.password}")
+    private String password;
+
+    /**
+     * mqtt发送者url
+     */
+    @Value("${mqtt.send.url}")
+    private String hostUrl;
+    /**
+     * mqtt发送者客户端id
+     */
+    @Value("${mqtt.send.clientId}")
+    private String clientId;
+    /**
+     * mqtt发送者主题
+     */
+    @Value("${mqtt.send.topic}")
+    private String msgTopic;
+
+    /**
+     * mqtt发送者主题
+     */
+    @Value("${mqtt.receive.topic}")
+    private String msgReceiveTopic;
+
+    /**
+     * mqtt发送者超时时间
+     */
+    @Value("${mqtt.send.completionTimeout}")
+    private int completionTimeout;
+
+    @Value("${mqtt.send.keepAliveInterval}")
+    private int keepAliveInterval;
+
+    @Value("${mqtt.send.connectionTimeout}")
+    private int connectionTimeout;
+
+    @Autowired
+    private MqttApiHandler mqttApiHandler;
+
+
+    /**
+     * 新建MqttConnectionOptionsBean  MQTT连接器选项
+     * @return
+     */
+    @Bean
+    public MqttConnectOptions getSenderMqttConnectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        // 设置连接的用户名
+        if (!username.trim().equals("")) {
+            //将用户名去掉前后空格
+            options.setUserName(username);
+        }
+        // 设置连接的密码
+        options.setPassword(password.toCharArray());
+        // 转化连接的url地址
+        String[] uris = {hostUrl};
+        // 设置连接的地址
+        options.setServerURIs(uris);
+        // 设置超时时间 单位为秒
+        options.setConnectionTimeout(completionTimeout);
+        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
+        // 但这个方法并没有重连的机制
+        options.setKeepAliveInterval(keepAliveInterval);
+        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
+        //设置超时时间
+        options.setConnectionTimeout(connectionTimeout);
+        options.setCleanSession(true);
+        options.setAutomaticReconnect(true);
+        return options;
+    }
+
+    /**
+     * 创建MqttPathClientFactoryBean
+     */
+    @Bean
+    public MqttPahoClientFactory senderMqttClientFactory() {
+        //创建mqtt客户端工厂
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        //设置mqtt的连接设置
+        factory.setConnectionOptions(getSenderMqttConnectOptions());
+        return factory;
+    }
+
+    /**
+     * 发布者-MQTT信息通道(生产者)
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+
+    /**
+     * 发布者-MQTT消息处理器(生产者)  将channel绑定到MqttClientFactory上
+     *
+     * @return {@link MessageHandler}
+     */
+    @Bean
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler mqttOutbound() {
+        //创建消息处理器
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
+                clientId + "_pub",
+                senderMqttClientFactory());
+        //设置消息处理类型为异步
+        messageHandler.setAsync(true);
+        //设置消息的默认主题
+        messageHandler.setDefaultTopic(msgTopic);
+        messageHandler.setDefaultRetained(false);
+        //1.重新连接MQTT服务时,不需要接收该主题最新消息,设置retained为false;
+        //2.重新连接MQTT服务时,需要接收该主题最新消息,设置retained为true;
+        return messageHandler;
+    }
+
+
+    /************                             消费者,订阅者的消费信息                               *****/
+
+    /**
+     * MQTT信息通道(消费者)
+     */
+    @Bean(name = CHANNEL_NAME_IN)
+    public MessageChannel mqttInboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息订阅绑定(消费者)
+     */
+    @Bean
+    public MessageProducer inbound() {
+//        System.out.println("topics:" + msgTopic);
+        // 可以同时消费(订阅)多个Topic
+        MqttPahoMessageDrivenChannelAdapter adapter =
+                new MqttPahoMessageDrivenChannelAdapter(
+                        clientId + "_sub", senderMqttClientFactory(), msgReceiveTopic.split(","));
+        adapter.setCompletionTimeout(5000);
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(0);
+        // 设置订阅通道
+        adapter.setOutputChannel(mqttInboundChannel());
+        return adapter;
+    }
+
+    @Autowired
+    private MqttCallbackHandler mqttCallbackHandler;
+    /**
+     * MQTT消息处理器(消费者)
+     */
+    @Bean
+    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
+    public MessageHandler handler() {
+        return message -> {
+            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
+            String payload = message.getPayload().toString();
+//            mqttCallbackHandler.handle(topic, payload);
+            mqttApiHandler.handle(topic, payload);
+        };
+    }
+}

+ 8 - 8
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/consumer/MqttCallbackHandler.java

@@ -73,7 +73,7 @@ public class MqttCallbackHandler {
         try {
             Map<String, SystemDict> dictByDictCode = commonService.getDictByDictCode(DictEnum.MODEL_PART);
             DevicesAlarm devicesAlarm = JSONObject.parseObject(payload, DevicesAlarm.class);
-            String deviceCode = devicesAlarm.getDeviceCode();
+            String deviceCode = devicesAlarm.getGuid();
             List<KwsDevice> deviceList = deviceMapper.selectList(new LambdaQueryWrapper<KwsDevice>()
                     .eq(KwsDevice::getSnCode, deviceCode)
                     .eq(KwsDevice::getDelFlag, NumberConstant.ZERO)
@@ -100,7 +100,7 @@ public class MqttCallbackHandler {
                     if (org.springframework.util.CollectionUtils.isEmpty(kwsThresholds)) {
                         continue;
                     }
-                    Long ts = devicesItem.getTs();
+                    Long ts = devicesAlarm.getTs();
                     //判断是否满足设备超时离线报警
                     checkDeviceAlarm(deviceCode, device, itemValue, ts);
                     //判断是否满足数值超阈值报警
@@ -440,20 +440,20 @@ public class MqttCallbackHandler {
 
     public static void main(String[] args) {
         DevicesAlarm alarm = new DevicesAlarm();
-        alarm.setDeviceCode("s8");
+        alarm.setGuid("s8");
         List<DevicesItem> list = new ArrayList<>();
         DevicesItem item = new DevicesItem();
         item.setItemName("alt");
         item.setItemValue("20.12646584");
         long time = new Date().getTime();
-        item.setTs(new Date().getTime());
+//        item.setTs(new Date().getTime());
         String md5 = new MD5Utils().getMd5(String.valueOf(time));
-        item.setGuid(md5);
+//        item.setGuid(md5);
         SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         String format = simpleDateFormat.format(new Date());
-        item.setTslverId(format);
-        item.setMsgId(format);
-        item.setRawTs(format);
+//        item.setTslverId(format);
+//        item.setMsgId(format);
+//        item.setRawTs(format);
         list.add(item);
         alarm.setDevicesItemList(list);
         String demoTest = "{\"deviceCode\":\"s8\",\"devicesItemList\":[{\"guid\":\"c7653b298bec06eb78be3140b5c2ea9b\",\"itemName\":\"alt\",\"itemValue\":\"20.12646584\",\"msg_id\":\"2023-11-18 14:49:06\",\"raw_ts\":\"2023-11-18 14:49:06\",\"ts\":1700290146508,\"tslver_Id\":\"2023-11-18 14:49:06\"}]}";

+ 25 - 19
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/consumer/MqttDeviceCallbackHandler.java

@@ -1,5 +1,6 @@
 package com.sckw.slope.detection.consumer;
 
+import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.dynamic.datasource.annotation.DSTransactional;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -9,6 +10,7 @@ import com.sckw.core.model.enums.DictItemEnum;
 import com.sckw.core.utils.CollectionUtils;
 import com.sckw.core.utils.IdWorker;
 import com.sckw.core.web.response.HttpResult;
+import com.sckw.log.TraceLog.TraceLog;
 import com.sckw.slope.detection.dao.mysql.*;
 import com.sckw.slope.detection.dao.tdengine.SlopeDataMapper;
 import com.sckw.slope.detection.model.dos.mysql.*;
@@ -40,7 +42,7 @@ import java.util.stream.Collectors;
  * @date 2023-10-26 08:10:45
  */
 @Slf4j
-@Service("sharjeck/ai/test/out")
+@Service("system/iot/device_data_slope")
 public class MqttDeviceCallbackHandler extends AbstractHandler {
 
     @Value("${sms.url}")
@@ -64,18 +66,27 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
     @Autowired
     HandlerFactory handlerFactory;
 
+    @TraceLog(description = "处理system/iot/device_data/")
 //    @Transactional
     @DSTransactional
     public HttpResult handle(String topic, String payload) {
         // 根据topic分别进行消息处理。
         log.info("MqttDeviceCallbackHandler:" + topic + "|" + payload);
-        System.out.println("处理sharjeck/ai/test/out");
+        System.out.println("system/iot/original_data_slope");
+        DevicesAlarm devicesAlarm = JSONObject.parseObject(payload, DevicesAlarm.class);
+        Map<String, SystemDict> dictByDictCode = commonService.getDictByDictCode(DictEnum.MODEL_PART);
+        JSONObject objects = JSONObject.parseObject(payload);
+        String deviceTs = objects.getString("ts");
+        String deviceGuid = objects.getString("guid");
+        String deviceTslverId = objects.getString("tslver_id");
+        String deviceMsgId = objects.getString("msg_id");
+        String deviceRawTs = objects.getString("raw_ts");
+        JSONArray checkArr = objects.getJSONArray("check_arr");
         try {
-            Map<String, SystemDict> dictByDictCode = commonService.getDictByDictCode(DictEnum.MODEL_PART);
-            DevicesAlarm devicesAlarm = JSONObject.parseObject(payload, DevicesAlarm.class);
-            String deviceCode = devicesAlarm.getDeviceCode();
+//            DevicesAlarm devicesAlarm = JSONObject.parseObject(payload, DevicesAlarm.class);
+            String deviceCode = devicesAlarm.getGuid();
             List<KwsDevice> deviceList = deviceMapper.selectList(new LambdaQueryWrapper<KwsDevice>()
-                    .eq(KwsDevice::getSnCode, deviceCode)
+                    .eq(KwsDevice::getSnCodeFullName, deviceCode)
                     .eq(KwsDevice::getDelFlag, NumberConstant.ZERO)
             );
             KwsDevice device = null;
@@ -84,7 +95,7 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
             }
             if (device == null) {
                 log.error("接收处理数据:{}", payload);
-                log.error("设备数据并未维护,不做处理");
+                log.error("设备数据并未维护+["+devicesAlarm.getGuid()+"],不做处理");
                 return null;
             }
             List<DevicesItem> devicesItemList = devicesAlarm.getDevicesItemList();
@@ -100,7 +111,7 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
                     if (org.springframework.util.CollectionUtils.isEmpty(kwsThresholds)) {
                         continue;
                     }
-                    Long ts = devicesItem.getTs();
+                    Long ts = devicesAlarm.getTs();
                     //判断是否满足设备超时离线报警
                     checkDeviceAlarm(deviceCode, device, itemValue, ts);
                     //判断是否满足数值超阈值报警
@@ -166,7 +177,7 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
 
     private void checkDeviceAlarm(String deviceCode, KwsDevice device, String itemValue, Long ts) {
         try {
-            Devices devices = tdengineService.selectLastData(deviceCode);
+            Devices devices = tdengineService.selectLastData(device.getSnCode());
             if (devices != null) {
                 Date deviceTime = devices.getTs();
                 Date date = new Date();
@@ -203,7 +214,7 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
     KwsDeviceReferenceMapper deviceReferenceMapper;
 
     @Autowired
-    private  TdengineService tdengineService;
+    private TdengineService tdengineService;
 
 
     private Map<String, String> checkThresholdAlarm(DevicesItem devicesItem, List<KwsThreshold> kwsThresholds, KwsDevice device) {
@@ -275,7 +286,7 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
             map.put("flag", "1");
             if (min != null || max != null) {
                 if (max != null) {
-                    Long level = maxMap.get(max);
+                    Long level = maxMap.get(max.toString());
                     Long thresholdId = maxMap.get(level.toString());
                     map.put("level", level.toString());
                     map.put("thresholdId", thresholdId.toString());
@@ -283,7 +294,7 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
 //                    insertAlarmAndDetail(level, device, thresholdId, itemValue, ts);
                 }
                 if (min != null) {
-                    Long level = minMap.get(min);
+                    Long level = minMap.get(min.toString());
                     Long thresholdId = minMap.get(level.toString());
                     map.put("level", level.toString());
                     map.put("thresholdId", thresholdId.toString());
@@ -376,7 +387,6 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
         KwsProjectDevice projectDevice = projectDeviceMapper.selectOne(new LambdaQueryWrapper<KwsProjectDevice>()
                 .eq(KwsProjectDevice::getDeviceId, deviceId)
                 .eq(KwsProjectDevice::getDelFlag, 0)
-                .eq(KwsProjectDevice::getStatus, 0)
                 .orderByDesc(KwsProjectDevice::getCreateTime)
                 .last(" limit 1")
         );
@@ -461,21 +471,17 @@ public class MqttDeviceCallbackHandler extends AbstractHandler {
     }
 
     public static void main(String[] args) {
+
+        String aaa = "{\"ts\":1700808976,\"guid\":\"0838161D-4C64-EC3E-A3A8-C736C47D93D4\",\"tslver_id\":\"3\",\"msg_id\":\"586569123094614016\",\"raw_ts\":\"\",\"check_arr\":[{\"itemName\":\"lng\",\"itemValue\":\"103.51826198069217\"},{\"itemName\":\"lat\",\"itemValue\":\"29.46725728245242\"},{\"itemName\":\"high\",\"itemValue\":\"710.1730218537282\"},{\"itemName\":\"fix\",\"itemValue\":4015},{\"itemName\":\"e\",\"itemValue\":\"302.7644742928626\"},{\"itemName\":\"n\",\"itemValue\":\"711.0632664506148\"},{\"itemName\":\"u\",\"itemValue\":\"-271.97856155098333\"},{\"itemName\":\"num\",\"itemValue\":7200},{\"itemName\":\"time\",\"itemValue\":\"2023-11-0818:00:00\"}]}";
         DevicesAlarm alarm = new DevicesAlarm();
-        alarm.setDeviceCode("s8");
         List<DevicesItem> list = new ArrayList<>();
         DevicesItem item = new DevicesItem();
         item.setItemName("alt");
         item.setItemValue("20.12646584");
         long time = new Date().getTime();
-        item.setTs(new Date().getTime());
         String md5 = new MD5Utils().getMd5(String.valueOf(time));
-        item.setGuid(md5);
         SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         String format = simpleDateFormat.format(new Date());
-        item.setTslverId(format);
-        item.setMsgId(format);
-        item.setRawTs(format);
         list.add(item);
         alarm.setDevicesItemList(list);
         String demoTest = "{\"deviceCode\":\"s8\",\"devicesItemList\":[{\"guid\":\"c7653b298bec06eb78be3140b5c2ea9b\",\"itemName\":\"alt\",\"itemValue\":\"20.12646584\",\"msg_id\":\"2023-11-18 14:49:06\",\"raw_ts\":\"2023-11-18 14:49:06\",\"ts\":1700290146508,\"tslver_Id\":\"2023-11-18 14:49:06\"}]}";

+ 3 - 0
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/consumer/mqApi/MqttApiHandler.java

@@ -1,6 +1,7 @@
 package com.sckw.slope.detection.consumer.mqApi;
 
 import com.sckw.core.web.response.HttpResult;
+import com.sckw.log.TraceLog.TraceLog;
 import com.sckw.slope.detection.consumer.AbstractHandler;
 import com.sckw.slope.detection.consumer.HandlerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -16,6 +17,8 @@ public class MqttApiHandler {
 
     @Autowired
     HandlerFactory handlerFactory;
+
+    @TraceLog(description = "mqtt消费")
     public HttpResult handle(String topic, String payload) {
         System.out.println(topic);
         // 根据topic分别进行消息处理。

+ 17 - 1
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/model/dto/DevicesAlarm.java

@@ -1,5 +1,7 @@
 package com.sckw.slope.detection.model.dto;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -12,11 +14,25 @@ import java.util.List;
  */
 @Data
 public class DevicesAlarm implements Serializable {
+    private Long ts;
     /**
      * 设备code
      */
-    private String deviceCode;
+    private String guid;
+    @JsonProperty("tslver_id")
+    @SerializedName("tslver_id")
+    private String tslverId;
 
+    @JsonProperty("msg_id")
+    @SerializedName("msg_id")
+    private String msgId;
+
+    @JsonProperty("raw_ts")
+    @SerializedName("raw_ts")
+    private String rawTs;
+
+    @JsonProperty("check_arr")
+    @SerializedName("check_arr")
     private List<DevicesItem> devicesItemList;
 
 }

+ 0 - 13
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/model/dto/DevicesItem.java

@@ -1,6 +1,5 @@
 package com.sckw.slope.detection.model.dto;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -21,16 +20,4 @@ public class DevicesItem implements Serializable {
      * 要素值
      */
     private String itemValue;
-
-    private Long ts;
-    private String guid;
-
-    @JsonProperty("tslver_Id")
-    private String tslverId;
-
-    @JsonProperty("msg_id")
-    private String msgId;
-
-    @JsonProperty("raw_ts")
-    private String rawTs;
 }

+ 13 - 1
slope-modules/slope-detection/src/main/java/com/sckw/slope/detection/service/CommonService.java

@@ -43,6 +43,9 @@ import java.util.Map;
 @Service
 public class CommonService {
 
+    @Value("${spring.profiles.active}")
+    private String isDev;
+
     @Value("${OkHttpClit.url}")
     String url;
 
@@ -64,6 +67,9 @@ public class CommonService {
 //        if (StringUtils.isBlank(uInfo)) {
 //            throw new BusinessException("数据请求异常!");
 //        }
+        if ("lfdc".equals(isDev) || "dev".equals(isDev)) {
+            uInfo = "eyJjb21wYW55SWQiOiIxOTgiLCJjb21wYW55TmFtZSI6IuWbm+W3nemHkemhtumbhuWboiIsImNyZWF0ZUJ5IjoiMiIsImNyZWF0ZU5hbWUiOiI2MDA2NzgiLCJ1cGRhdGVCeSI6IjIiLCJ1cGRhdGVOYW1lIjoiNjAwNjc4IiwibW91bnRhaW5JZCI6IjE5OCJ9";
+        }
         String message = new String(Base64.getDecoder().decode(uInfo));
         HeaderData headerData = JSONObject.parseObject(message, HeaderData.class);
         if (headerData.getMountainId().isBlank()) {
@@ -78,6 +84,12 @@ public class CommonService {
         return headerData;
     }
 
+    public static void main(String[] args) {
+        String uinfo = "eyJjb21wYW55SWQiOm51bGwsImNvbXBhbnlOYW1lIjpudWxsLCJjcmVhdGVCeSI6MSwiY3JlYXRlTmFtZSI6Ilx1OGQ4NVx1N2VhN1x1N2JhMVx1NzQwNlx1NTQ1OCIsInVwZGF0ZUJ5IjoxLCJ1cGRhdGVOYW1lIjoiXHU4ZDg1XHU3ZWE3XHU3YmExXHU3NDA2XHU1NDU4IiwibW91bnRhaW5JZCI6bnVsbH0=";
+        String message = new String(Base64.getDecoder().decode(uinfo));
+        HeaderData headerData = JSONObject.parseObject(message, HeaderData.class);
+    }
+
     public PageRes getDict(QueryDictTypePageReqVo queryDictTypePageReqVo) {
         PageHelper.startPage(queryDictTypePageReqVo.getPage(), queryDictTypePageReqVo.getPageSize());
         List<QueryDictTypePageReqVo> list = dictMapper.selectListAll(queryDictTypePageReqVo);
@@ -186,7 +198,7 @@ public class CommonService {
 
 
     /**
-     *  根据偏移量,返回测量值
+     * 根据偏移量,返回测量值
      *
      * @param value
      * @param itemName

+ 2 - 2
slope-modules/slope-detection/src/main/resources/mapper/mysql/KwsDeviceMapper.xml

@@ -9,7 +9,7 @@
     <result column="name" jdbcType="VARCHAR" property="name" />
     <result column="alias" jdbcType="VARCHAR" property="alias" />
     <result column="sn_code" jdbcType="VARCHAR" property="snCode" />
-    <result column="sn_code_fullname" jdbcType="VARCHAR" property="snCodeFullName" />
+    <result column="sn_code_full_name" jdbcType="VARCHAR" property="snCodeFullName" />
     <result column="model_id" jdbcType="BIGINT" property="modelId" />
     <result column="mountain_id" jdbcType="VARCHAR" property="mountainId" />
     <result column="valid_time" jdbcType="TIMESTAMP" property="validTime" />
@@ -28,7 +28,7 @@
   </resultMap>
   <sql id="Base_Column_List">
     <!--@mbg.generated-->
-    id, `attribute`, `name`, `alias`, sn_code, model_id, valid_time, secret_key, inter_face,relevance_level,sn_code_fullname,
+    id, `attribute`, `name`, `alias`, sn_code, model_id, valid_time, secret_key, inter_face,relevance_level,sn_code_full_name,
     remark, `status`, create_by, create_time, update_by, update_time, del_flag,mountain_id,related,check_time,online
   </sql>
   <select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">