|
|
@@ -3,8 +3,10 @@ package com.sckw.message.service;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
|
|
|
import com.sckw.core.utils.StringUtils;
|
|
|
+import com.sckw.message.constant.RedisConstant;
|
|
|
import com.sckw.message.model.KwmMessage;
|
|
|
import com.sckw.message.model.KwmMessageUser;
|
|
|
+import com.sckw.redis.utils.RedissonUtils;
|
|
|
import com.sckw.stream.enums.MessageEnum;
|
|
|
import com.sckw.stream.model.SckwMessage;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
@@ -29,15 +31,19 @@ public class SckwMessageHandlerService {
|
|
|
private final KwmMessageUserService kwmMessageUserService;
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * @param sckwMessage
|
|
|
- * @return void
|
|
|
- * @desc: message消息处理
|
|
|
- * @author: yzc
|
|
|
- * @date: 2023-06-09 15:33
|
|
|
- */
|
|
|
- @Transactional(rollbackFor = Exception.class)
|
|
|
+/**
|
|
|
+ * @desc: message消息处理
|
|
|
+ * @author: yzc
|
|
|
+ * @date: 2023-06-09 15:33
|
|
|
+ * @Param sckwMessage:
|
|
|
+ * @return: void
|
|
|
+ */
|
|
|
+@Transactional(rollbackFor = Exception.class)
|
|
|
public void handler(SckwMessage sckwMessage) {
|
|
|
+ if (Boolean.FALSE.equals(RedissonUtils.tryLock(getKey(sckwMessage.getRequestId()), 10L, RedisConstant.CONSUMER_REQUEST_VALID_TIME))) {
|
|
|
+ log.info("重复message消息:{},不处理", sckwMessage.getRequestId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
String userIds = sckwMessage.getUserIds();
|
|
|
if (StringUtils.isBlank(userIds)) {
|
|
|
log.error("message消息处理用户id为空,不处理");
|
|
|
@@ -62,4 +68,8 @@ public class SckwMessageHandlerService {
|
|
|
});
|
|
|
kwmMessageUserService.batchSave(messageUsers);
|
|
|
}
|
|
|
+
|
|
|
+ public static String getKey(String requestId) {
|
|
|
+ return String.format(RedisConstant.MESSAGE_CONSUMER_REQUEST_KEY, requestId);
|
|
|
+ }
|
|
|
}
|