diff --git a/epmet-module/epmet-job/epmet-job-server/pom.xml b/epmet-module/epmet-job/epmet-job-server/pom.xml index e29f5759e5..769cfab32a 100644 --- a/epmet-module/epmet-job/epmet-job-server/pom.xml +++ b/epmet-module/epmet-job/epmet-job-server/pom.xml @@ -38,6 +38,11 @@ epmet-third-client 2.0.0 + + com.epmet + epmet-message-client + 2.0.0 + org.springframework.boot spring-boot-starter-web diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java new file mode 100644 index 0000000000..3316aef52a --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerService.java @@ -0,0 +1,23 @@ +package com.epmet.service; + +public interface SystemMessageScannerService { + /** + * @description 扫描未成功发送到MQ的消息 + * + * @param + * @return + * @author wxz + * @date 2021.10.16 23:24:05 + */ + void scanPenddingSystemMQMessage(); + + /** + * @description 扫描发送成功但是未正常处理的MQ的消息 + * + * @param + * @return + * @author wxz + * @date 2021.10.16 23:24:27 + */ + void scanBlockedSystemMQMessage(); +} diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java new file mode 100644 index 0000000000..f47ba3ec39 --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/service/SystemMessageScannerServiceImpl.java @@ -0,0 +1,22 @@ +package com.epmet.service; + +import com.epmet.feign.EpmetMessageOpenFeignClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class SystemMessageScannerServiceImpl implements SystemMessageScannerService { + + @Autowired + private EpmetMessageOpenFeignClient messageOpenFeignClient; + + @Override + public void scanPenddingSystemMQMessage() { + messageOpenFeignClient.penddingMqMsgScan(); + } + + @Override + public void scanBlockedSystemMQMessage() { + messageOpenFeignClient.blockedMqMsgScan(); + } +} diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java new file mode 100644 index 0000000000..6840b15eaa --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/BlockedMQSystemMessageScanner.java @@ -0,0 +1,25 @@ +package com.epmet.task; + +import com.epmet.service.SystemMessageScannerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component("blockedMQSystemMessageScanner") +@Slf4j +public class BlockedMQSystemMessageScanner implements ITask { + + @Autowired + private SystemMessageScannerService systemMessageScannerService; + + @Override + public void run(String params) { + try { + //log.info("【blockedMQSystemMessageScanner】开始执行scanBlockedSystemMQMessage任务"); + systemMessageScannerService.scanBlockedSystemMQMessage(); + log.info("【blockedMQSystemMessageScanner】执行scanBlockedSystemMQMessage任务完成"); + } catch (Exception e) { + log.error("【blockedMQSystemMessageScanner】执行scanBlockedSystemMQMessage任务失败"); + } + } +} diff --git a/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java new file mode 100644 index 0000000000..f4b24f2cb6 --- /dev/null +++ b/epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/PenddingMQSystemMessageScanner.java @@ -0,0 +1,25 @@ +package com.epmet.task; + +import com.epmet.service.SystemMessageScannerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component("penddingMQSystemMessageScanner") +@Slf4j +public class PenddingMQSystemMessageScanner implements ITask { + + @Autowired + private SystemMessageScannerService systemMessageScannerService; + + @Override + public void run(String params) { + try { + //log.info("【blockedMQSystemMessageScanner】开始执行scanBlockedSystemMQMessage任务"); + systemMessageScannerService.scanPenddingSystemMQMessage(); + log.info("【blockedMQSystemMessageScanner】执行scanPenddingSystemMQMessage任务完成"); + } catch (Exception e) { + log.error("【blockedMQSystemMessageScanner】执行scanPenddingSystemMQMessage任务失败"); + } + } +} diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java index 454fd5fd5d..b3681a0b33 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java @@ -109,13 +109,24 @@ public interface EpmetMessageOpenFeignClient { Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); /** - * @description 应答mq消息 + * @description 检查MQ阻塞消息(MQ收到消息,但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息) * - * @param form + * @param + * @return + * @author wxz + * @date 2021.10.16 22:07:38 + */ + @PostMapping("/message/system/blocked-mq-msg-scan") + Result blockedMqMsgScan(); + + /** + * @description 检查MQ滞留消息(往MQ发送消息失败,MQ未收到消息) + * + * @param * @return * @author wxz - * @date 2021.10.14 16:07:17 + * @date 2021.10.16 22:08:32 */ - @PostMapping("/message/system/ack-mq-msg") - Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); + @PostMapping("/message/system/pendding-mq-msg-scan") + Result penddingMqMsgScan(); }