Kaynağa Gözat

拓展kafka协议

xucaiqin 1 yıl önce
ebeveyn
işleme
00419cbc4e

+ 5 - 7
iot-module/iot-module-demo/src/main/java/com/middle/platform/demo/service/KafkaService.java

@@ -24,13 +24,11 @@ public class KafkaService {
     /**
      * 模拟定时发送方
      */
-    @Scheduled(initialDelay = 0, fixedRate = 1, timeUnit = TimeUnit.SECONDS)
+    @Scheduled(initialDelay = 0, fixedRate = 10, timeUnit = TimeUnit.SECONDS)
     public void job() {
-        for (int i = 0; i < 3000; i++) {
-            JSONObject jsonObject = new JSONObject();
-            jsonObject.put("123", LocalDateTimeUtil.formatNormal(LocalDateTimeUtil.now()));
-            jsonObject.put("temp", new Random().nextInt());
-            kafkaTemplate.send("iot.data", jsonObject.toJSONString());
-        }
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("123", LocalDateTimeUtil.formatNormal(LocalDateTimeUtil.now()));
+        jsonObject.put("temp", new Random().nextInt());
+        kafkaTemplate.send("iot.sn", jsonObject.toJSONString());
     }
 }

+ 2 - 2
iot-module/iot-module-demo/src/main/resources/application.yml

@@ -1,7 +1,7 @@
 server:
-  port: 8080
+  port: 8089
 spring:
   kafka:
     bootstrap-servers: localhost:9092
     consumer:
-      group-id: list
+      group-id: iot

+ 1 - 0
iot-module/iot-module-manage/iot-module-manage-biz/src/main/java/com/middle/platform/manage/biz/constant/UrlProtocol.java

@@ -7,4 +7,5 @@ package com.middle.platform.manage.biz.constant;
 public interface UrlProtocol {
     String MQTT = "1";
     String HTTP = "2";
+    String KAFKA = "3";
 }

+ 1 - 2
iot-module/iot-module-manage/iot-module-manage-biz/src/main/java/com/middle/platform/manage/biz/constant/UrlTypeEnum.java

@@ -10,8 +10,7 @@ import lombok.Getter;
 @Getter
 @AllArgsConstructor
 public enum UrlTypeEnum {
-    MQTT("MQTT", "1"),
-    HTTP("HTTP", "2"),
+    MQTT("MQTT", "1"), HTTP("HTTP", "2"), KAFKA("KAFKA", "3"),
     ;
     private final String label;
     private final String val;

+ 20 - 1
iot-module/iot-module-manage/iot-module-manage-biz/src/main/java/com/middle/platform/manage/biz/service/IotUrlService.java

@@ -239,9 +239,28 @@ public class IotUrlService {
                 subUrl(iotUrl.getUrl(), iotUrl.getFunc() + iotUrl.getPermission());
             }
         }
+        if (StrUtil.equals(iotProduct.getReportProtocol(), UrlProtocol.KAFKA)) {
+            IotUrl iotUrl = new IotUrl();
+            iotUrl.setProductId(iotProduct.getId());
+            iotUrl.setCategory(UrlCategory.init);
+            iotUrl.setFunc("1");
+            iotUrl.setUrl("iot.data");
+            iotUrl.setPermission("1");
+            iotUrl.setType(iotProduct.getReportProtocol());
+            iotUrl.setRemark("KAFKA默认topic");
+            iotUrlMapper.insert(iotUrl);
+        }
         //http
         if (Objects.equals(iotProduct.getReportProtocol(), UrlProtocol.HTTP)) {
-
+            IotUrl iotUrl = new IotUrl();
+            iotUrl.setProductId(iotProduct.getId());
+            iotUrl.setCategory(UrlCategory.init);
+            iotUrl.setFunc("1");
+            iotUrl.setUrl("/data/report/{code}/{deviceSn}");
+            iotUrl.setPermission("1");
+            iotUrl.setType(iotProduct.getReportProtocol());
+            iotUrl.setRemark("HTTP默认地址");
+            iotUrlMapper.insert(iotUrl);
         }
     }