瀏覽代碼

消息推送调整

xucaiqin 2 年之前
父節點
當前提交
5d2c5ede28

+ 2 - 2
sckw-modules/sckw-payment/src/main/java/com/sckw/payment/job/AsyncPool.java

@@ -23,8 +23,8 @@ public class AsyncPool {
     }
 
     private static class ThreadPool {
-        public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
-                2,
+        public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() / 2,
+                Runtime.getRuntime().availableProcessors() / 2 + 1,
                 1,
                 TimeUnit.MINUTES,
                 new LinkedBlockingQueue<>(12));

+ 47 - 33
sckw-modules/sckw-payment/src/main/java/com/sckw/payment/service/MessageSender.java

@@ -2,6 +2,7 @@ package com.sckw.payment.service;
 
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
+import com.sckw.payment.job.AsyncPool;
 import com.sckw.payment.model.JumpUrlConfig;
 import com.sckw.payment.model.KwpLedgerLogisticsTrack;
 import com.sckw.payment.model.KwpLedgerTradeTrack;
@@ -47,9 +48,11 @@ public class MessageSender {
      * @param messageEnum
      */
     public void sendCreate(Long createBy, Map<String, Object> map, UserInfo user, MessageEnum messageEnum) {
-        SckwMessage sckwMessage = new SckwMessage(messageEnum).setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum))).setParams(map).setUserInfos(Collections.singletonList(user)).setCreateBy(createBy);
-        log.info("推送创建人:{}", JSONObject.toJSONString(sckwMessage));
-        streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
+        AsyncPool.addTask(() -> {
+            SckwMessage sckwMessage = new SckwMessage(messageEnum).setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum))).setParams(map).setUserInfos(Collections.singletonList(user)).setCreateBy(createBy);
+            log.info("推送创建人:{}", JSONObject.toJSONString(sckwMessage));
+            streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
+        });
     }
 
     /**
@@ -60,12 +63,15 @@ public class MessageSender {
      * @param messageEnum 消息枚举
      */
     public void sendManager(Long createBy, Map<String, Object> map, Long entId, MessageEnum messageEnum) {
-        EntCacheResDto entCacheResDto = remoteSystemService.queryEntCacheById(entId);
-        if (Objects.nonNull(entCacheResDto)) {
-            SckwMessage sckwMessage = new SckwMessage(messageEnum).setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum))).setParams(map).setUserInfos(Collections.singletonList(new UserInfo(entCacheResDto.getContactsId(), entCacheResDto.getId()))).setCreateBy(createBy);
-            log.info("推送系统管理员:{}", JSONObject.toJSONString(sckwMessage));
-            streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
-        }
+        AsyncPool.addTask(() -> {
+            EntCacheResDto entCacheResDto = remoteSystemService.queryEntCacheById(entId);
+            if (Objects.nonNull(entCacheResDto)) {
+                SckwMessage sckwMessage = new SckwMessage(messageEnum).setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum))).setParams(map).setUserInfos(Collections.singletonList(new UserInfo(entCacheResDto.getContactsId(), entCacheResDto.getId()))).setCreateBy(createBy);
+                log.info("推送系统管理员:{}", JSONObject.toJSONString(sckwMessage));
+                streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
+            }
+        });
+
     }
 
     /**
@@ -77,14 +83,16 @@ public class MessageSender {
      * @param messageEnum
      */
     private void sendSellChanger(Long createBy, Map<String, Object> map, Long ledgerId, MessageEnum messageEnum) {
-        KwpLedgerTradeTrack kwpLedgerTradeTrack = tradeTrackService.queryLedgered(ledgerId);
-        if (Objects.nonNull(kwpLedgerTradeTrack)) {
-            UserCacheResDto userCacheResDto = remoteSystemService.queryUserCacheById(kwpLedgerTradeTrack.getCreateBy());
-            EntCacheResDto entCacheResDto = Objects.nonNull(userCacheResDto) ? (Objects.nonNull(userCacheResDto.getEntInfo()) ? userCacheResDto.getEntInfo() : new EntCacheResDto()) : new EntCacheResDto();
-            SckwMessage sckwMessage = new SckwMessage(messageEnum).setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum))).setParams(map).setUserInfos(Collections.singletonList(new UserInfo(kwpLedgerTradeTrack.getCreateBy(), entCacheResDto.getId()))).setCreateBy(createBy);
-            log.info("推送变更人:{}", JSONObject.toJSONString(sckwMessage));
-            streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
-        }
+        AsyncPool.addTask(() -> {
+            KwpLedgerTradeTrack kwpLedgerTradeTrack = tradeTrackService.queryLedgered(ledgerId);
+            if (Objects.nonNull(kwpLedgerTradeTrack)) {
+                UserCacheResDto userCacheResDto = remoteSystemService.queryUserCacheById(kwpLedgerTradeTrack.getCreateBy());
+                EntCacheResDto entCacheResDto = Objects.nonNull(userCacheResDto) ? (Objects.nonNull(userCacheResDto.getEntInfo()) ? userCacheResDto.getEntInfo() : new EntCacheResDto()) : new EntCacheResDto();
+                SckwMessage sckwMessage = new SckwMessage(messageEnum).setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum))).setParams(map).setUserInfos(Collections.singletonList(new UserInfo(kwpLedgerTradeTrack.getCreateBy(), entCacheResDto.getId()))).setCreateBy(createBy);
+                log.info("推送变更人:{}", JSONObject.toJSONString(sckwMessage));
+                streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
+            }
+        });
     }
 
     /**
@@ -96,16 +104,18 @@ public class MessageSender {
      * @param messageEnum
      */
     private void sendPurchaseChanger(Long createBy, Map<String, Object> map, Long ledgerId, MessageEnum messageEnum) {
-        KwpLedgerLogisticsTrack kwpLedgerLogisticsTrack = logisticsTrackService.queryLedgered(ledgerId);
-        if (Objects.nonNull(kwpLedgerLogisticsTrack)) {
-            UserCacheResDto userCacheResDto = remoteSystemService.queryUserCacheById(kwpLedgerLogisticsTrack.getCreateBy());
-            EntCacheResDto entCacheResDto = Objects.nonNull(userCacheResDto) ? (Objects.nonNull(userCacheResDto.getEntInfo()) ? userCacheResDto.getEntInfo() : new EntCacheResDto()) : new EntCacheResDto();
-            SckwMessage sckwMessage = new SckwMessage(messageEnum)
-                    .setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum)))
-                    .setParams(map).setUserInfos(Collections.singletonList(new UserInfo(kwpLedgerLogisticsTrack.getCreateBy(), entCacheResDto.getId()))).setCreateBy(createBy);
-            log.info("推送变更人:{}", JSONObject.toJSONString(sckwMessage));
-            streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
-        }
+        AsyncPool.addTask(() -> {
+            KwpLedgerLogisticsTrack kwpLedgerLogisticsTrack = logisticsTrackService.queryLedgered(ledgerId);
+            if (Objects.nonNull(kwpLedgerLogisticsTrack)) {
+                UserCacheResDto userCacheResDto = remoteSystemService.queryUserCacheById(kwpLedgerLogisticsTrack.getCreateBy());
+                EntCacheResDto entCacheResDto = Objects.nonNull(userCacheResDto) ? (Objects.nonNull(userCacheResDto.getEntInfo()) ? userCacheResDto.getEntInfo() : new EntCacheResDto()) : new EntCacheResDto();
+                SckwMessage sckwMessage = new SckwMessage(messageEnum)
+                        .setMsgUrl(jumpUrlConfig.getUrl().get(JumpEnum.getByEnum(messageEnum)))
+                        .setParams(map).setUserInfos(Collections.singletonList(new UserInfo(kwpLedgerLogisticsTrack.getCreateBy(), entCacheResDto.getId()))).setCreateBy(createBy);
+                log.info("推送变更人:{}", JSONObject.toJSONString(sckwMessage));
+                streamBridge.send("sckw-message", JSON.toJSONString(sckwMessage));
+            }
+        });
     }
 
     /**
@@ -118,8 +128,10 @@ public class MessageSender {
      * @param messageEnum
      */
     public void sendSellBoth(Long createBy, Map<String, Object> map, Long ledgerId, Long entId, MessageEnum messageEnum) {
-        sendManager(createBy, map, entId, messageEnum);
-        sendSellChanger(createBy, map, ledgerId, messageEnum);
+        AsyncPool.addTask(() -> {
+            sendManager(createBy, map, entId, messageEnum);
+            sendSellChanger(createBy, map, ledgerId, messageEnum);
+        });
     }
 
     /**
@@ -127,12 +139,14 @@ public class MessageSender {
      *
      * @param createBy
      * @param map
-     * @param ledgerId 对账单id
-     * @param entId 需要推送的对账单关联的企业
+     * @param ledgerId    对账单id
+     * @param entId       需要推送的对账单关联的企业
      * @param messageEnum
      */
     public void sendPurchaseBoth(Long createBy, Map<String, Object> map, Long ledgerId, Long entId, MessageEnum messageEnum) {
-        sendManager(createBy, map, entId, messageEnum);
-        sendPurchaseChanger(createBy, map, ledgerId, messageEnum);
+        AsyncPool.addTask(() -> {
+            sendManager(createBy, map, entId, messageEnum);
+            sendPurchaseChanger(createBy, map, ledgerId, messageEnum);
+        });
     }
 }