From 19f0ad54bf71cf8ec8b26140b54db606d1c90eec Mon Sep 17 00:00:00 2001 From: wxz Date: Sat, 16 Oct 2021 23:33:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=20job=E6=89=AB?= =?UTF-8?q?=E6=8F=8F=E6=BB=9E=E7=95=99/=E9=98=BB=E5=A1=9E=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../epmet-job/epmet-job-server/pom.xml | 5 ++++ .../service/SystemMessageScannerService.java | 23 +++++++++++++++++ .../SystemMessageScannerServiceImpl.java | 22 ++++++++++++++++ .../task/BlockedMQSystemMessageScanner.java | 25 +++++++++++++++++++ .../task/PenddingMQSystemMessageScanner.java | 25 +++++++++++++++++++ .../feign/EpmetMessageOpenFeignClient.java | 21 ++++++++++++---- 6 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java create mode 100644 epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java create mode 100644 epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java create mode 100644 epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java diff --git a/epmet-module/epmet-job/epmet-job-server/pom.xml b/epmet-module/epmet-job/epmet-job-server/pom.xml index e29f5759e5..769cfab32a 100644 --- a/epmet-module/epmet-job/epmet-job-server/pom.xml +++ b/epmet-module/epmet-job/epmet-job-server/pom.xml @@ -38,6 +38,11 @@ epmet-third-client 2.0.0 + + com.epmet + epmet-message-client + 2.0.0 + org.springframework.boot spring-boot-starter-web diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java new file mode 100644 index 0000000000..3316aef52a --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java @@ -0,0 +1,23 @@ +package com.epmet.service; + +public interface SystemMessageScannerService { + /** + * @description 扫描未成功发送到MQ的消息 + * + * @param + * @return + * @author wxz + * @date 2021.10.16 23:24:05 + */ + void scanPenddingSystemMQMessage(); + + /** + * @description 扫描发送成功但是未正常处理的MQ的消息 + * + * @param + * @return + * @author wxz + * @date 2021.10.16 23:24:27 + */ + void scanBlockedSystemMQMessage(); +} diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java new file mode 100644 index 0000000000..f47ba3ec39 --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java @@ -0,0 +1,22 @@ +package com.epmet.service; + +import com.epmet.feign.EpmetMessageOpenFeignClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class SystemMessageScannerServiceImpl implements SystemMessageScannerService { + + @Autowired + private EpmetMessageOpenFeignClient messageOpenFeignClient; + + @Override + public void scanPenddingSystemMQMessage() { + messageOpenFeignClient.penddingMqMsgScan(); + } + + @Override + public void scanBlockedSystemMQMessage() { + messageOpenFeignClient.blockedMqMsgScan(); + } +} diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java new file mode 100644 index 0000000000..6840b15eaa --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java @@ -0,0 +1,25 @@ +package com.epmet.task; + +import com.epmet.service.SystemMessageScannerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component("blockedMQSystemMessageScanner") +@Slf4j +public class BlockedMQSystemMessageScanner implements ITask { + + @Autowired + private SystemMessageScannerService systemMessageScannerService; + + @Override + public void run(String params) { + try { + //log.info("【blockedMQSystemMessageScanner】开始执行scanBlockedSystemMQMessage任务"); + systemMessageScannerService.scanBlockedSystemMQMessage(); + log.info("【blockedMQSystemMessageScanner】执行scanBlockedSystemMQMessage任务完成"); + } catch (Exception e) { + log.error("【blockedMQSystemMessageScanner】执行scanBlockedSystemMQMessage任务失败"); + } + } +} diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java new file mode 100644 index 0000000000..f4b24f2cb6 --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java @@ -0,0 +1,25 @@ +package com.epmet.task; + +import com.epmet.service.SystemMessageScannerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component("penddingMQSystemMessageScanner") +@Slf4j +public class PenddingMQSystemMessageScanner implements ITask { + + @Autowired + private SystemMessageScannerService systemMessageScannerService; + + @Override + public void run(String params) { + try { + //log.info("【blockedMQSystemMessageScanner】开始执行scanBlockedSystemMQMessage任务"); + systemMessageScannerService.scanPenddingSystemMQMessage(); + log.info("【blockedMQSystemMessageScanner】执行scanPenddingSystemMQMessage任务完成"); + } catch (Exception e) { + log.error("【blockedMQSystemMessageScanner】执行scanPenddingSystemMQMessage任务失败"); + } + } +} diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java index 454fd5fd5d..b3681a0b33 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java @@ -109,13 +109,24 @@ public interface EpmetMessageOpenFeignClient { Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); /** - * @description 应答mq消息 + * @description 检查MQ阻塞消息(MQ收到消息,但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息) * - * @param form + * @param + * @return + * @author wxz + * @date 2021.10.16 22:07:38 + */ + @PostMapping("/message/system/blocked-mq-msg-scan") + Result blockedMqMsgScan(); + + /** + * @description 检查MQ滞留消息(往MQ发送消息失败,MQ未收到消息) + * + * @param * @return * @author wxz - * @date 2021.10.14 16:07:17 + * @date 2021.10.16 22:08:32 */ - @PostMapping("/message/system/ack-mq-msg") - Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); + @PostMapping("/message/system/pendding-mq-msg-scan") + Result penddingMqMsgScan(); }