Browse Source

kafka协议客户端demo

xucaiqin 1 year ago
parent
commit
1c8c64f0e9

+ 5 - 45
iot-module/iot-module-demo/src/main/java/com/middle/platform/demo/controller/TestController.java

@@ -1,9 +1,7 @@
 package com.middle.platform.demo.controller;
 
-import com.middle.platform.demo.service.KafkaService;
-import jakarta.annotation.Resource;
+import cn.hutool.core.date.LocalDateTimeUtil;
 import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
@@ -15,53 +13,15 @@ import org.springframework.web.bind.annotation.RestController;
 @RequestMapping("/kafka")
 public class TestController {
 
-    @Resource
-    private KafkaService kafkaService;
-
     /**
      * 发送文本消息
      *
-     * @param msg
      * @return
      */
-    @GetMapping("/send/{msg}")
-    public String send(@PathVariable String msg) {
-        kafkaService.tet(msg);
-        return "";
+    @GetMapping("/test")
+    public String send() {
+        return LocalDateTimeUtil.formatNormal(LocalDateTimeUtil.now());
     }
 
-//    /**
-//     * 发送JSON数据
-//     * @return
-//     */
-//    @GetMapping("/send2")
-//    public String send2() {
-//        Message message = new Message();
-//        message.setId(System.currentTimeMillis());
-//        message.setMsg("生产者发送消息到topic1: " + UUID.getUUID32());
-//        message.setSendTime(new Date());
-//
-//        String value = JSON.toJSONString(message);
-//        log.info("生产者发送消息到topic1 message = {}", value);
-//
-//        kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
-//        return value;
-//    }
-//    /**
-//     * 发送JSON数据
-//     * @return
-//     */
-//    @GetMapping("/send3")
-//    public String send3() {
-//        Message message = new Message();
-//        message.setId(System.currentTimeMillis());
-//        message.setMsg("生产者发送消息到topic2: " + UUID.getUUID32());
-//        message.setSendTime(new Date());
-//
-//        String value = JSON.toJSONString(message);
-//        log.info("生产者发送消息到topic2 message = {}", value);
-//
-//        kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
-//        return value;
-//    }
+
 }

+ 12 - 5
iot-module/iot-module-demo/src/main/java/com/middle/platform/demo/service/KafkaService.java

@@ -1,22 +1,29 @@
 package com.middle.platform.demo.service;
 
+import cn.hutool.core.date.LocalDateTimeUtil;
 import com.alibaba.fastjson.JSONObject;
 import lombok.RequiredArgsConstructor;
 import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Service;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author xucaiqin
  * @date 2024-04-02 15:19:51
  */
-@Service
+@EnableScheduling
+@Component
 @RequiredArgsConstructor
 public class KafkaService {
     private final KafkaTemplate<String, Object> kafkaTemplate;
 
-    public void tet(String s) {
+    @Scheduled(initialDelay = 0, fixedRate = 1, timeUnit = TimeUnit.MINUTES)
+    public void job() {
         JSONObject jsonObject = new JSONObject();
-        jsonObject.put("123", s);
-        kafkaTemplate.send("topic", jsonObject.toJSONString());
+        jsonObject.put("123", LocalDateTimeUtil.formatNormal(LocalDateTimeUtil.now()));
+        kafkaTemplate.send("iot.data", jsonObject.toJSONString());
     }
 }

+ 0 - 20
iot-module/iot-module-demo/src/main/java/com/middle/platform/demo/service/Test.java

@@ -1,20 +0,0 @@
-package com.middle.platform.demo.service;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Component;
-
-/**
- * @author xucaiqin
- * @date 2024-04-02 15:16:48
- */
-@Component
-public class Test {
-    @KafkaListener(topics = "topic")
-    public void listen(ConsumerRecord<?,String> record) {
-        System.out.println(record);
-        String value = record.value();
-        System.out.println("消费者1接收到消息:" + value);
-
-    }
-}