From d3f10217feaacc26d2b410e60ddff0f1d833dd6a Mon Sep 17 00:00:00 2001 From: wxz Date: Sat, 16 Oct 2021 22:24:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=201.epmet-message?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E6=89=AB=E6=8F=8F"=E6=9C=AA=E6=88=90?= =?UTF-8?q?=E5=8A=9F=E5=8F=91=E9=80=81=E5=88=B0mq=E7=9A=84=E6=BB=9E?= =?UTF-8?q?=E7=95=99=E6=B6=88=E6=81=AF"=E7=9A=84=E6=8E=A5=E5=8F=A3=202.?= =?UTF-8?q?=E8=B0=83=E6=95=B4mq=E9=98=BB=E5=A1=9E=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E7=9A=84key=203.=E5=85=B6=E4=BB=96=E9=80=BB=E8=BE=91=E5=BE=AE?= =?UTF-8?q?=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/AuthOperationLogListener.java | 8 +- .../listener/PointOperationLogListener.java | 8 +- .../listener/ProjectOperationLogListener.java | 8 +- .../rocketmq/constants/MQUserPropertys.java | 4 +- .../epmet/commons/tools/redis/RedisKeys.java | 8 +- .../mq/ProjectChangedCustomListener.java | 6 +- .../controller/SystemMessageController.java | 24 ++++- .../epmet/dao/SystemMessagePenddingDao.java | 35 +++++++ .../entity/SystemMessagePenddingEntity.java | 61 ++++++++++++ .../epmet/service/SystemMessageService.java | 16 ++- .../impl/SystemMessageServiceImpl.java | 97 +++++++++++++++---- .../mapper/SystemMessagePenddingDao.xml | 26 +++++ .../IssueProjectCategoryTagInitListener.java | 8 +- .../InitCustomerOrgRolesListener.java | 8 +- .../OpenDataOrgChangeEventListener.java | 8 +- .../OpenDataPatrolChangeEventListener.java | 7 +- .../OpenDataStaffChangeEventListener.java | 8 +- .../InitCustomerComponentsListener.java | 8 +- 18 files changed, 282 insertions(+), 66 deletions(-) create mode 100644 epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java create mode 100644 epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java create mode 100644 epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java index ee243d88dd..6f8afd1418 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java @@ -58,7 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("认证操作日志监听器-收到消息内容:{}", msg); LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class); @@ -101,7 +101,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -115,8 +115,8 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); + //logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java index d883b7b1f6..aa4f50a768 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java @@ -58,7 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String opeType = messageExt.getTags(); String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("积分操作日志监听器-收到消息内容:{}", msg); PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class); @@ -103,7 +103,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently { try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【积分操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -117,8 +117,8 @@ public class PointOperationLogListener implements MessageListenerConcurrently { * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); + //logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java index d968d95448..a47d55c2de 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java @@ -58,7 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { //String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("项目变动操作日志监听器-收到消息内容:{}", msg); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); @@ -103,7 +103,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【项目操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -136,8 +136,8 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); + //logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); } } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java index a69de64cca..c1a53a29ba 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java @@ -7,7 +7,7 @@ package com.epmet.commons.rocketmq.constants; */ public interface MQUserPropertys { - //堆积消息label - String PENDING_MSG_LABEL = "pendingMsgLabel"; + //阻塞消息label + String BLOCKED_MSG_LABEL = "blockedMsgLabel"; } diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index ad66110c51..388927ce5a 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -549,14 +549,14 @@ public class RedisKeys { } /** - * @description 检查message MQ滞留消息 + * @description 检查message MQ阻塞消息 * - * @param pendingMsgLabel 滞留消息的label + * @param blockedMsgLabel 滞留消息的label * @return * @author wxz * @date 2021.10.14 14:33:53 */ - public static String pendingMqMsgKey(String pendingMsgLabel) { - return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel); + public static String blockedMqMsgKey(String blockedMsgLabel) { + return rootPrefix.concat("message:mq:blocked:").concat(blockedMsgLabel); } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index dd23989ca7..1792654765 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -70,7 +70,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently private void consumeMessage(MessageExt msgExt) { String msg = new String(msgExt.getBody()); - String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("receive customerId:{}", JSON.toJSONString(msg)); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); @@ -179,9 +179,9 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【项目变动事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【项目变动事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); } /*@Override diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java index cc353dd449..8c1ea726cd 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java @@ -28,10 +28,32 @@ public class SystemMessageController { @PostMapping("send-by-mq") public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class); - systemMessageService.sendMQMessage(form.getMessageType(), form.getContent()); + systemMessageService.persistAndSendMQMessage(form.getMessageType(), form.getContent()); return new Result(); } + /** + * @description 检查MQ阻塞消息(MQ收到消息,但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息) + * + * @param + * @return + * @author wxz + * @date 2021.10.16 22:07:38 + */ + @PostMapping("blocked-mq-msg-scan") + public Result blockedMqMsgScan() { + systemMessageService.blockedMqMsgScan(); + return new Result(); + } + + /** + * @description 检查MQ滞留消息(往MQ发送消息失败,MQ未收到消息) + * + * @param + * @return + * @author wxz + * @date 2021.10.16 22:08:32 + */ @PostMapping("pendding-mq-msg-scan") public Result penddingMqMsgScan() { systemMessageService.penddingMqMsgScan(); diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java new file mode 100644 index 0000000000..57ba0d3afb --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java @@ -0,0 +1,35 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.dao; + +import com.epmet.commons.mybatis.dao.BaseDao; +import com.epmet.entity.SystemMessagePenddingEntity; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +/** + * 系统消息表 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-16 + */ +@Mapper +public interface SystemMessagePenddingDao extends BaseDao { + + void physicalDeleteById(@Param("penddingId") String penddingId); +} \ No newline at end of file diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java new file mode 100644 index 0000000000..87a7956504 --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java @@ -0,0 +1,61 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.entity; + +import com.baomidou.mybatisplus.annotation.TableName; + +import com.epmet.commons.mybatis.entity.BaseEpmetEntity; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Date; + +/** + * 系统消息表 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-16 + */ +@Data +@EqualsAndHashCode(callSuper=false) +@TableName("system_message_pendding") +public class SystemMessagePenddingEntity extends BaseEpmetEntity { + + private static final long serialVersionUID = 1L; + + /** + * 消息类型。init_customer:客户初始化,login登录,logout退出 + */ + private String msgType; + + /** + * 消息主表id + */ + private String msgId; + + /** + * 消息发送途径 + */ + private String sendApproach; + + /** + * 消息内容 + */ + private String content; + +} diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java index f48e56cf9f..64e34527d1 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java @@ -3,7 +3,7 @@ package com.epmet.service; public interface SystemMessageService { /** - * @description 发送mq消息 + * @description 持久化和发送mq消息 * * @param messageType * @param content @@ -11,15 +11,25 @@ public interface SystemMessageService { * @author wxz * @date 2021.10.14 15:07:02 */ - void sendMQMessage(String messageType, Object content); + void persistAndSendMQMessage(String messageType, Object content); /** - * @description 扫描滞留消息 + * @description 扫描阻塞的消息 * * @param * @return * @author wxz * @date 2021.10.15 10:13:37 */ + void blockedMqMsgScan(); + + /** + * @description 扫描待办的消息(因为发送到MQ失败而堆积) + * + * @param + * @return + * @author wxz + * @date 2021.10.16 12:11:39 + */ void penddingMqMsgScan(); } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index 7bbc426d9e..dfd6737b57 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -12,7 +12,9 @@ import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageType; import com.epmet.dao.SystemMessageDao; +import com.epmet.dao.SystemMessagePenddingDao; import com.epmet.entity.SystemMessageEntity; +import com.epmet.entity.SystemMessagePenddingEntity; import com.epmet.service.SystemMessageService; import lombok.AllArgsConstructor; import lombok.Data; @@ -27,6 +29,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; +import java.util.List; import java.util.Set; import java.util.UUID; @@ -53,51 +56,90 @@ public class SystemMessageServiceImpl implements SystemMessageService { @Autowired private SystemMessageDao systemMessageDao; + @Autowired + private SystemMessagePenddingDao systemMessagePenddingDao; + @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private RedisUtils redisUtils; - @Transactional(rollbackFor = Exception.class) @Override - public void sendMQMessage(String messageType, Object content) { + public void persistAndSendMQMessage(String messageType, Object content) { String contentStr = JSON.toJSONString(content); - // 1.存储消息到表 - SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); - systemMessageEntity.setMsgType(messageType); - systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ); - systemMessageEntity.setContent(contentStr); - systemMessageDao.insert(systemMessageEntity); + logger.info("【发送MQ系统消息】-落盘并发送-messageType:{}, contentStr:{}", messageType, contentStr); + // 1.消息落盘 + String penddingMsgId = persistMessage(messageType, contentStr); + + // 2.发送消息 + sendMQMessage(messageType, contentStr, penddingMsgId); + } + + /** + * @description 消息落盘,有事务 + * + * @param messageType + * @param contentStr + * @return + * @author wxz + * @date 2021.10.16 11:04:53 + */ + @Transactional(rollbackFor = Exception.class) + public String persistMessage(String messageType, String contentStr) { + + SystemMessageEntity message = new SystemMessageEntity(); + message.setMsgType(messageType); + message.setSendApproach(SystemMessageSendApproach.MQ); + message.setContent(contentStr); + systemMessageDao.insert(message); + + SystemMessagePenddingEntity pendding = new SystemMessagePenddingEntity(); + pendding.setMsgType(messageType); + pendding.setSendApproach(SystemMessageSendApproach.MQ); + pendding.setContent(contentStr); + pendding.setMsgId(message.getId()); + systemMessagePenddingDao.insert(pendding); + + logger.info("【发送MQ系统消息】-落盘完成"); + return pendding.getId(); + } + + private void sendMQMessage(String messageType, String contentStr, String penddingId) { String topic = getTopicByMsgType(messageType); - // 2.缓存下来,供滞留消息扫描。TTL -1,永不过期 + // 缓存下来,供滞留消息扫描。TTL -1,永不过期 String pendingMsgLabel = null; String pendingMsgKey = null; try { MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr); pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); - pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + pendingMsgKey = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.set(pendingMsgKey, mcb, -1); + //logger.info("【发送MQ系统消息】-存入redis堆积列表成功-{}-{}-{}", topic, messageType, pendingMsgLabel); } catch (Exception e) { - logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); + logger.error("【发送MQ系统消息】将系统MQ消息存储到Redis堆积列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); } // 3.发送mq消息 try { Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); - meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); + meMessage.putUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL, pendingMsgLabel); rocketMQTemplate.getProducer().send(meMessage); + logger.info("【发送MQ系统消息】-发送到MQ成功"); } catch (Exception e) { String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); - logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); + logger.error("【发送MQ系统消息】发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); - // 清理消息缓存 + // 清理阻塞中的消息缓存 redisUtils.delete(pendingMsgKey); throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode()); } + + // 删除消息堆积 + systemMessagePenddingDao.physicalDeleteById(penddingId); } /** @@ -146,8 +188,8 @@ public class SystemMessageServiceImpl implements SystemMessageService { } @Override - public void penddingMqMsgScan() { - String scanKey = RedisKeys.pendingMqMsgKey("*"); + public void blockedMqMsgScan() { + String scanKey = RedisKeys.blockedMqMsgKey("*"); Set keys = redisUtils.keys(scanKey); //System.out.println(keys); for (String key : keys) { @@ -173,7 +215,7 @@ public class SystemMessageServiceImpl implements SystemMessageService { && l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) ) { - logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size()); l1LastAlertTime = now; } @@ -185,7 +227,7 @@ public class SystemMessageServiceImpl implements SystemMessageService { createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now) && l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) ) { - logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size()); l1LastAlertTime = now; } break; @@ -193,6 +235,25 @@ public class SystemMessageServiceImpl implements SystemMessageService { } } + @Override + public void penddingMqMsgScan() { + Integer count = systemMessagePenddingDao.selectCount(null); + if (count == 0) { + return; + } + + // 扫描并且重新投递 + List penddingMsgs = systemMessagePenddingDao.selectList(null); + for (SystemMessagePenddingEntity penddingMsg : penddingMsgs) { + try { + sendMQMessage(penddingMsg.getMsgType(), penddingMsg.getContent(), penddingMsg.getId()); + } catch (Exception e) { + // 投递失败不应影响后续消息的投递 + logger.error("【重新投递MQ消息】失败, msgType:{}, penddingMsgId:{}, 错误:{}", penddingMsg.getMsgType(), penddingMsg.getId() , ExceptionUtils.getErrorStackTrace(e)); + } + } + } + @Data @NoArgsConstructor @AllArgsConstructor diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml b/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml new file mode 100644 index 0000000000..4a5ab00608 --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + delete from system_message_pendding where ID = #{penddingId} + + + + \ No newline at end of file diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java index cb7af65139..a9eb9c436f 100644 --- a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java +++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java @@ -57,7 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu public void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -84,7 +84,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【议题/项目分类标签初始化事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -98,8 +98,8 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【议题/项目分类标签初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); } } \ No newline at end of file diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java index 31346a2e8b..11bcc80e38 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java @@ -56,7 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -82,7 +82,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【客户初始化事件监听器】-orgRole-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -129,9 +129,9 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【客户初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【客户初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); } /* @Override diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java index 5a13adb637..df6278488e 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -55,7 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent String msg = new String(messageExt.getBody()); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); GridBaseInfoFormDTO obj = JSON.parseObject(msg, GridBaseInfoFormDTO.class); @@ -87,7 +87,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -101,8 +101,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,pendingMsgLabel:{}", pendingMsgLabel); + //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java index a9797b34d8..4e40f0974f 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java @@ -58,7 +58,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr String msg = new String(messageExt.getBody()); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags); StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class); @@ -93,7 +93,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-巡查-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -107,7 +107,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); + //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java index 270c7ff3e3..313c036cb8 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -58,11 +58,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre String msg = new String(messageExt.getBody()); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); //messageExt.propert - logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); + logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel); StaffBaseInfoFormDTO obj = JSON.parseObject(msg, StaffBaseInfoFormDTO.class); DistributedLock distributedLock = null; @@ -87,7 +87,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-staff-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -102,7 +102,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); } } diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java index d560979694..5980ad0a2a 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java @@ -51,7 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -82,7 +82,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -96,9 +96,9 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); } /* @Override