소스 검색

兼容kafka协议

xucaiqin 1 년 전
부모
커밋
49aa8560d6

+ 18 - 0
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/pojo/KafkaData.java

@@ -0,0 +1,18 @@
+package com.middle.platform.data.biz.pojo;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * kafka接收数据的基础类
+ *
+ * @author xucaiqin
+ * @date 2024-04-29 10:57:42
+ */
+@Getter
+@Setter
+public class KafkaData {
+    private String productKey;
+    private String deviceSn;
+    private Object data;
+}

+ 24 - 3
iot-module/iot-module-data/iot-module-data-biz/src/main/java/com/middle/platform/data/biz/service/kafka/KafkaService.java

@@ -1,6 +1,14 @@
 package com.middle.platform.data.biz.service.kafka;
 
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.middle.platform.common.core.constant.RabbitConstant;
+import com.middle.platform.data.biz.pojo.KafkaData;
+import com.middle.platform.data.biz.pojo.data.RawData;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
@@ -9,12 +17,25 @@ import org.springframework.stereotype.Component;
  * @date 2024-04-17 15:23:05
  */
 @Component
+@Slf4j
 public class KafkaService {
+    @Resource
+    private AmqpTemplate amqpTemplate;
 
     @KafkaListener(topics = {"iot.data"})
     public void receive(ConsumerRecord<String, Object> object) {
-        System.out.println("receive:" + object);
-
-
+        log.info("receive:" + JSONObject.toJSONString(object));
+        Object value = object.value();
+        if (value instanceof String vs) {
+            KafkaData kafkaData = JSONObject.parseObject(vs, KafkaData.class);
+            if(StrUtil.isBlank(kafkaData.getProductKey())){
+                return;
+            }
+            if(StrUtil.isBlank(kafkaData.getDeviceSn())){
+                return;
+            }
+            amqpTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_DATA, RabbitConstant.ROUTING_POINT_DATA_PREFIX + RabbitConstant.RAW_QUEUE,
+                    new RawData(kafkaData.getProductKey(), kafkaData.getDeviceSn(), kafkaData.getData(), 1));
+        }
     }
 }