|
|
@@ -0,0 +1,126 @@
|
|
|
+package com.platform.api.manager;
|
|
|
+
|
|
|
+
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
+import com.platform.api.config.VehicleDataQueue;
|
|
|
+import com.platform.api.request.VehicleDataSaveRequest;
|
|
|
+import com.platform.entity.VehicleData;
|
|
|
+import com.platform.enums.ErrorCodeEnum;
|
|
|
+import com.platform.exception.IotException;
|
|
|
+import com.platform.mapper.TaosMapper;
|
|
|
+import com.platform.service.VehicleDataService;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
+import org.springframework.beans.BeansException;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.context.ApplicationContextAware;
|
|
|
+import org.springframework.lang.Nullable;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Author: donglang
|
|
|
+ * Time: 2025-10-13
|
|
|
+ * Des:
|
|
|
+ * Version: 1.0
|
|
|
+ */
|
|
|
+
|
|
|
+@Service
|
|
|
+@RequiredArgsConstructor
|
|
|
+public class TransferVehicleManage implements ApplicationContextAware {
|
|
|
+
|
|
|
+ private final TaosMapper taosMapper;
|
|
|
+
|
|
|
+ private final VehicleDataService vehicleDataService;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 队列
|
|
|
+ */
|
|
|
+ private final VehicleDataQueue vehicleDataQueue;
|
|
|
+
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setApplicationContext(@Nullable ApplicationContext applicationContext) throws BeansException {
|
|
|
+ this.applicationContext = applicationContext;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 接收单条数据,暂存到队列
|
|
|
+ */
|
|
|
+ public void saveVehicleData(VehicleDataSaveRequest request) {
|
|
|
+ // 1. 暂存数据到队列
|
|
|
+ vehicleDataQueue.add(request);
|
|
|
+
|
|
|
+ // 2. 检查是否达到批量阈值,若达到则批量保存
|
|
|
+ List<VehicleDataSaveRequest> vehicleDataList = vehicleDataQueue.takeBatchIfReached();
|
|
|
+ if (CollectionUtils.isEmpty(vehicleDataList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ TransferVehicleManage proxy = applicationContext.getBean(TransferVehicleManage.class);
|
|
|
+ proxy.batchSaveVehicle(vehicleDataList);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 插入车辆数据
|
|
|
+ */
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void batchSaveVehicle(List<VehicleDataSaveRequest> vehicleDataList) {
|
|
|
+ try {
|
|
|
+ List<VehicleData> newVehicles = checkVehicle(vehicleDataList);
|
|
|
+ // 批量保存到关系库
|
|
|
+ if (CollectionUtils.isNotEmpty(newVehicles)) {
|
|
|
+ vehicleDataService.saveBatch(newVehicles);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 批量保存到时序数据库
|
|
|
+ taosMapper.batchInsertVehicleData(vehicleDataList);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new IotException(ErrorCodeEnum.TD_SAVE_FAIL, "批量保存车辆数据到时序数据库失败");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 校验关系数据库中是否存在车辆
|
|
|
+ */
|
|
|
+ private List<VehicleData> checkVehicle(List<VehicleDataSaveRequest> vehicleDataList) {
|
|
|
+ //提取所有车牌
|
|
|
+ List<String> carNoList = vehicleDataList.stream()
|
|
|
+ .map(ve -> ve.getVehicleDataVO().getCarNo())
|
|
|
+ .distinct()
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 2. 批量查询已存在的车牌
|
|
|
+ List<String> existCarNos = vehicleDataService.list(Wrappers.<VehicleData>lambdaQuery()
|
|
|
+ .in(VehicleData::getCarNo, carNoList))
|
|
|
+ .stream()
|
|
|
+ .map(VehicleData::getCarNo)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 3. 筛选出需要新增的车辆
|
|
|
+ return vehicleDataList.stream()
|
|
|
+ .filter(req -> !existCarNos.contains(req.getVehicleDataVO().getCarNo()))
|
|
|
+ .map(req -> VehicleData.toVehicleData(req.getVehicleDataVO()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定时兜底任务:每10分钟执行一次,防止数据长期凑不满10条
|
|
|
+ */
|
|
|
+ @Scheduled(fixedRate = 600 * 1000)
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void scheduledBatchSave() {
|
|
|
+ List<VehicleDataSaveRequest> remainingData = vehicleDataQueue.takeAll();
|
|
|
+ if (!remainingData.isEmpty()) {
|
|
|
+ batchSaveVehicle(remainingData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+}
|