|
@@ -54,12 +54,15 @@ import org.apache.dubbo.config.annotation.DubboReference;
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.cloud.stream.function.StreamBridge;
|
|
import org.springframework.cloud.stream.function.StreamBridge;
|
|
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
|
|
+import org.springframework.data.redis.core.script.DefaultRedisScript;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
import java.math.BigDecimal;
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
@@ -105,6 +108,19 @@ public class WaybillOrderService {
|
|
|
|
|
|
|
|
private final KwtWaybillOrderV1Service waybillOrderV1Service;
|
|
private final KwtWaybillOrderV1Service waybillOrderV1Service;
|
|
|
|
|
|
|
|
|
|
+ // 注入RedisTemplate用于分布式锁
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private RedisTemplate<String, String> redisTemplate;
|
|
|
|
|
+
|
|
|
|
|
+ // 分布式锁相关常量
|
|
|
|
|
+ private static final String TAKING_ORDER_LOCK_PREFIX = "transport:taking_order:lock:";
|
|
|
|
|
+ // 锁超时时间30秒
|
|
|
|
|
+ private static final long LOCK_EXPIRE_SECONDS = 30;
|
|
|
|
|
+ // 锁等待时间500毫秒
|
|
|
|
|
+ private static final long LOCK_WAIT_MILLIS = 500;
|
|
|
|
|
+ // 锁重试间隔100毫秒
|
|
|
|
|
+ private static final long LOCK_RETRY_INTERVAL = 100;
|
|
|
|
|
+
|
|
|
//载重任务量浮动吨数
|
|
//载重任务量浮动吨数
|
|
|
private static final BigDecimal TWO_TONS = new BigDecimal("2");
|
|
private static final BigDecimal TWO_TONS = new BigDecimal("2");
|
|
|
//载重任务量计算比例
|
|
//载重任务量计算比例
|
|
@@ -834,7 +850,100 @@ public class WaybillOrderService {
|
|
|
*/
|
|
*/
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
public OrderTakingResp orderTaking(OrderCirculateTakingQueryParam param) {
|
|
public OrderTakingResp orderTaking(OrderCirculateTakingQueryParam param) {
|
|
|
- return takingOrderHandler.handler(param);
|
|
|
|
|
|
|
+ // 1. 幂等性校验:检查是否已经存在该车辆针对该物流订单的有效运单
|
|
|
|
|
+ checkIdempotent(param);
|
|
|
|
|
+
|
|
|
|
|
+ // 2. 构建分布式锁Key(物流订单ID+车牌号 唯一标识)
|
|
|
|
|
+ String lockKey = TAKING_ORDER_LOCK_PREFIX + param.getLogOrderId() + "_" + param.getTruckNo();
|
|
|
|
|
+ String requestId = UUID.randomUUID().toString();
|
|
|
|
|
+ boolean lockAcquired = false;
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 3. 获取分布式锁(带重试机制)
|
|
|
|
|
+ lockAcquired = acquireLock(lockKey, requestId, LOCK_EXPIRE_SECONDS, LOCK_WAIT_MILLIS);
|
|
|
|
|
+ if (!lockAcquired) {
|
|
|
|
|
+ log.warn("接单请求获取分布式锁失败,可能存在重复提交,param:{}", JSON.toJSONString(param));
|
|
|
|
|
+ throw new BusinessPlatfromException(ErrorCodeEnum.REPEAT_SUBMIT, "当前接单请求正在处理中,请稍后再试");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 再次幂等性校验(防止锁等待期间已经创建运单)
|
|
|
|
|
+ checkIdempotent(param);
|
|
|
|
|
+
|
|
|
|
|
+ // 6. 执行核心接单逻辑
|
|
|
|
|
+ return takingOrderHandler.handler(param);
|
|
|
|
|
+
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 7. 释放分布式锁(只有获取锁的请求才能释放)
|
|
|
|
|
+ if (lockAcquired) {
|
|
|
|
|
+ releaseLock(lockKey, requestId);
|
|
|
|
|
+ log.info("释放接单分布式锁成功,lockKey:{}, requestId:{}", lockKey, requestId);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 幂等性校验:检查是否已经存在该车辆针对该物流订单的有效运单
|
|
|
|
|
+ * @param param 接单参数
|
|
|
|
|
+ */
|
|
|
|
|
+ private void checkIdempotent(OrderCirculateTakingQueryParam param) {
|
|
|
|
|
+ // 查询该车辆针对该物流订单是否已有有效运单
|
|
|
|
|
+ KwtWaybillOrder existingOrder = waybillOrderRepository.findOneByTruckNo(param.getLogOrderId(), param.getTruckNo(), param.getEntId());
|
|
|
|
|
+
|
|
|
|
|
+ if (existingOrder != null) {
|
|
|
|
|
+ log.warn("重复接单校验失败,已存在有效运单,logOrderId:{}, truckNo:{}, waybillOrderId:{}",
|
|
|
|
|
+ param.getLogOrderId(), param.getTruckNo(), existingOrder.getId());
|
|
|
|
|
+ throw new BusinessPlatfromException(ErrorCodeEnum.REPEAT_SUBMIT,
|
|
|
|
|
+ String.format("该车辆[%s]已针对该物流订单接单,运单ID:%s", param.getTruckNo(), existingOrder.getId()));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 获取分布式锁(带重试机制)
|
|
|
|
|
+ * @param lockKey 锁Key
|
|
|
|
|
+ * @param requestId 请求ID(保证只有自己能释放锁)
|
|
|
|
|
+ * @param expireSeconds 锁过期时间(秒)
|
|
|
|
|
+ * @param waitMillis 最大等待时间(毫秒)
|
|
|
|
|
+ * @return 是否获取成功
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean acquireLock(String lockKey, String requestId, long expireSeconds, long waitMillis) {
|
|
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
|
|
+ while (System.currentTimeMillis() - startTime < waitMillis) {
|
|
|
|
|
+ // 使用SET NX EX命令获取锁(原子操作)
|
|
|
|
|
+ Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireSeconds, TimeUnit.SECONDS);
|
|
|
|
|
+ if (Boolean.TRUE.equals(success)) {
|
|
|
|
|
+ log.info("获取接单分布式锁成功,lockKey:{}, requestId:{}", lockKey, requestId);
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 未获取到锁,短暂休眠后重试
|
|
|
|
|
+ try {
|
|
|
|
|
+ Thread.sleep(LOCK_RETRY_INTERVAL);
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ log.error("获取锁时线程中断", e);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.warn("获取接单分布式锁超时,lockKey:{}, waitMillis:{}", lockKey, waitMillis);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 释放分布式锁(使用Lua脚本保证原子性)
|
|
|
|
|
+ * @param lockKey 锁Key
|
|
|
|
|
+ * @param requestId 请求ID
|
|
|
|
|
+ */
|
|
|
|
|
+ private void releaseLock(String lockKey, String requestId) {
|
|
|
|
|
+ String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
|
|
|
|
|
+ DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
|
|
|
|
|
+ redisScript.setScriptText(luaScript);
|
|
|
|
|
+ redisScript.setResultType(Long.class);
|
|
|
|
|
+
|
|
|
|
|
+ Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);
|
|
|
|
|
+ if (result == null || result == 0) {
|
|
|
|
|
+ log.warn("释放接单分布式锁失败,可能锁已过期或被其他请求释放,lockKey:{}, requestId:{}", lockKey, requestId);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|