diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java
index ee243d88dd..6f8afd1418 100644
--- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java
+++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java
@@ -58,7 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) {
String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("认证操作日志监听器-收到消息内容:{}", msg);
LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class);
@@ -101,7 +101,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -115,8 +115,8 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel);
+ //logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel);
}
}
diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java
index d883b7b1f6..aa4f50a768 100644
--- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java
+++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java
@@ -58,7 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) {
String opeType = messageExt.getTags();
String msg = new String(messageExt.getBody());
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("积分操作日志监听器-收到消息内容:{}", msg);
PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class);
@@ -103,7 +103,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【积分操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -117,8 +117,8 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
+ //logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
}
}
diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java
index d968d95448..a47d55c2de 100644
--- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java
+++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java
@@ -58,7 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) {
//String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("项目变动操作日志监听器-收到消息内容:{}", msg);
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@@ -103,7 +103,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【项目操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -136,8 +136,8 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
+ //logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
}
}
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java
index a69de64cca..c1a53a29ba 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java
@@ -7,7 +7,7 @@ package com.epmet.commons.rocketmq.constants;
*/
public interface MQUserPropertys {
- //堆积消息label
- String PENDING_MSG_LABEL = "pendingMsgLabel";
+ //阻塞消息label
+ String BLOCKED_MSG_LABEL = "blockedMsgLabel";
}
diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
index ad66110c51..388927ce5a 100644
--- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
+++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
@@ -549,14 +549,14 @@ public class RedisKeys {
}
/**
- * @description 检查message MQ滞留消息
+ * @description 检查message MQ阻塞消息
*
- * @param pendingMsgLabel 滞留消息的label
+ * @param blockedMsgLabel 滞留消息的label
* @return
* @author wxz
* @date 2021.10.14 14:33:53
*/
- public static String pendingMqMsgKey(String pendingMsgLabel) {
- return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel);
+ public static String blockedMqMsgKey(String blockedMsgLabel) {
+ return rootPrefix.concat("message:mq:blocked:").concat(blockedMsgLabel);
}
}
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
index dd23989ca7..1792654765 100644
--- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
@@ -70,7 +70,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt msgExt) {
String msg = new String(msgExt.getBody());
- String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("receive customerId:{}", JSON.toJSONString(msg));
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@@ -179,9 +179,9 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【项目变动事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
+ //logger.info("【项目变动事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
}
/*@Override
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
index cc353dd449..8c1ea726cd 100644
--- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
@@ -28,10 +28,32 @@ public class SystemMessageController {
@PostMapping("send-by-mq")
public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) {
ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class);
- systemMessageService.sendMQMessage(form.getMessageType(), form.getContent());
+ systemMessageService.persistAndSendMQMessage(form.getMessageType(), form.getContent());
return new Result();
}
+ /**
+ * @description 检查MQ阻塞消息(MQ收到消息,但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息)
+ *
+ * @param
+ * @return
+ * @author wxz
+ * @date 2021.10.16 22:07:38
+ */
+ @PostMapping("blocked-mq-msg-scan")
+ public Result blockedMqMsgScan() {
+ systemMessageService.blockedMqMsgScan();
+ return new Result();
+ }
+
+ /**
+ * @description 检查MQ滞留消息(往MQ发送消息失败,MQ未收到消息)
+ *
+ * @param
+ * @return
+ * @author wxz
+ * @date 2021.10.16 22:08:32
+ */
@PostMapping("pendding-mq-msg-scan")
public Result penddingMqMsgScan() {
systemMessageService.penddingMqMsgScan();
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java
new file mode 100644
index 0000000000..57ba0d3afb
--- /dev/null
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2018 人人开源 https://www.renren.io
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package com.epmet.dao;
+
+import com.epmet.commons.mybatis.dao.BaseDao;
+import com.epmet.entity.SystemMessagePenddingEntity;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+/**
+ * 系统消息表
+ *
+ * @author generator generator@elink-cn.com
+ * @since v1.0.0 2021-10-16
+ */
+@Mapper
+public interface SystemMessagePenddingDao extends BaseDao {
+
+ void physicalDeleteById(@Param("penddingId") String penddingId);
+}
\ No newline at end of file
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java
new file mode 100644
index 0000000000..87a7956504
--- /dev/null
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2018 人人开源 https://www.renren.io
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+package com.epmet.entity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+
+import com.epmet.commons.mybatis.entity.BaseEpmetEntity;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.Date;
+
+/**
+ * 系统消息表
+ *
+ * @author generator generator@elink-cn.com
+ * @since v1.0.0 2021-10-16
+ */
+@Data
+@EqualsAndHashCode(callSuper=false)
+@TableName("system_message_pendding")
+public class SystemMessagePenddingEntity extends BaseEpmetEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 消息类型。init_customer:客户初始化,login登录,logout退出
+ */
+ private String msgType;
+
+ /**
+ * 消息主表id
+ */
+ private String msgId;
+
+ /**
+ * 消息发送途径
+ */
+ private String sendApproach;
+
+ /**
+ * 消息内容
+ */
+ private String content;
+
+}
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
index f48e56cf9f..64e34527d1 100644
--- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
@@ -3,7 +3,7 @@ package com.epmet.service;
public interface SystemMessageService {
/**
- * @description 发送mq消息
+ * @description 持久化和发送mq消息
*
* @param messageType
* @param content
@@ -11,15 +11,25 @@ public interface SystemMessageService {
* @author wxz
* @date 2021.10.14 15:07:02
*/
- void sendMQMessage(String messageType, Object content);
+ void persistAndSendMQMessage(String messageType, Object content);
/**
- * @description 扫描滞留消息
+ * @description 扫描阻塞的消息
*
* @param
* @return
* @author wxz
* @date 2021.10.15 10:13:37
*/
+ void blockedMqMsgScan();
+
+ /**
+ * @description 扫描待办的消息(因为发送到MQ失败而堆积)
+ *
+ * @param
+ * @return
+ * @author wxz
+ * @date 2021.10.16 12:11:39
+ */
void penddingMqMsgScan();
}
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
index 7bbc426d9e..dfd6737b57 100644
--- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
@@ -12,7 +12,9 @@ import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.SystemMessageSendApproach;
import com.epmet.constant.SystemMessageType;
import com.epmet.dao.SystemMessageDao;
+import com.epmet.dao.SystemMessagePenddingDao;
import com.epmet.entity.SystemMessageEntity;
+import com.epmet.entity.SystemMessagePenddingEntity;
import com.epmet.service.SystemMessageService;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -27,6 +29,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -53,51 +56,90 @@ public class SystemMessageServiceImpl implements SystemMessageService {
@Autowired
private SystemMessageDao systemMessageDao;
+ @Autowired
+ private SystemMessagePenddingDao systemMessagePenddingDao;
+
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RedisUtils redisUtils;
- @Transactional(rollbackFor = Exception.class)
@Override
- public void sendMQMessage(String messageType, Object content) {
+ public void persistAndSendMQMessage(String messageType, Object content) {
String contentStr = JSON.toJSONString(content);
- // 1.存储消息到表
- SystemMessageEntity systemMessageEntity = new SystemMessageEntity();
- systemMessageEntity.setMsgType(messageType);
- systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ);
- systemMessageEntity.setContent(contentStr);
- systemMessageDao.insert(systemMessageEntity);
+ logger.info("【发送MQ系统消息】-落盘并发送-messageType:{}, contentStr:{}", messageType, contentStr);
+ // 1.消息落盘
+ String penddingMsgId = persistMessage(messageType, contentStr);
+
+ // 2.发送消息
+ sendMQMessage(messageType, contentStr, penddingMsgId);
+ }
+
+ /**
+ * @description 消息落盘,有事务
+ *
+ * @param messageType
+ * @param contentStr
+ * @return
+ * @author wxz
+ * @date 2021.10.16 11:04:53
+ */
+ @Transactional(rollbackFor = Exception.class)
+ public String persistMessage(String messageType, String contentStr) {
+
+ SystemMessageEntity message = new SystemMessageEntity();
+ message.setMsgType(messageType);
+ message.setSendApproach(SystemMessageSendApproach.MQ);
+ message.setContent(contentStr);
+ systemMessageDao.insert(message);
+
+ SystemMessagePenddingEntity pendding = new SystemMessagePenddingEntity();
+ pendding.setMsgType(messageType);
+ pendding.setSendApproach(SystemMessageSendApproach.MQ);
+ pendding.setContent(contentStr);
+ pendding.setMsgId(message.getId());
+ systemMessagePenddingDao.insert(pendding);
+
+ logger.info("【发送MQ系统消息】-落盘完成");
+ return pendding.getId();
+ }
+
+ private void sendMQMessage(String messageType, String contentStr, String penddingId) {
String topic = getTopicByMsgType(messageType);
- // 2.缓存下来,供滞留消息扫描。TTL -1,永不过期
+ // 缓存下来,供滞留消息扫描。TTL -1,永不过期
String pendingMsgLabel = null;
String pendingMsgKey = null;
try {
MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr);
pendingMsgLabel = UUID.randomUUID().toString().replace("-", "");
- pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ pendingMsgKey = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.set(pendingMsgKey, mcb, -1);
+ //logger.info("【发送MQ系统消息】-存入redis堆积列表成功-{}-{}-{}", topic, messageType, pendingMsgLabel);
} catch (Exception e) {
- logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e));
+ logger.error("【发送MQ系统消息】将系统MQ消息存储到Redis堆积列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e));
}
// 3.发送mq消息
try {
Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
- meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel);
+ meMessage.putUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL, pendingMsgLabel);
rocketMQTemplate.getProducer().send(meMessage);
+ logger.info("【发送MQ系统消息】-发送到MQ成功");
} catch (Exception e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
- logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace);
+ logger.error("【发送MQ系统消息】发送系统MQ消息失败,堆栈信息:{}", errorStackTrace);
- // 清理消息缓存
+ // 清理阻塞中的消息缓存
redisUtils.delete(pendingMsgKey);
throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode());
}
+
+ // 删除消息堆积
+ systemMessagePenddingDao.physicalDeleteById(penddingId);
}
/**
@@ -146,8 +188,8 @@ public class SystemMessageServiceImpl implements SystemMessageService {
}
@Override
- public void penddingMqMsgScan() {
- String scanKey = RedisKeys.pendingMqMsgKey("*");
+ public void blockedMqMsgScan() {
+ String scanKey = RedisKeys.blockedMqMsgKey("*");
Set keys = redisUtils.keys(scanKey);
//System.out.println(keys);
for (String key : keys) {
@@ -173,7 +215,7 @@ public class SystemMessageServiceImpl implements SystemMessageService {
&& l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) {
- logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size());
+ logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now;
}
@@ -185,7 +227,7 @@ public class SystemMessageServiceImpl implements SystemMessageService {
createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now)
&& l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) {
- logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size());
+ logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now;
}
break;
@@ -193,6 +235,25 @@ public class SystemMessageServiceImpl implements SystemMessageService {
}
}
+ @Override
+ public void penddingMqMsgScan() {
+ Integer count = systemMessagePenddingDao.selectCount(null);
+ if (count == 0) {
+ return;
+ }
+
+ // 扫描并且重新投递
+ List penddingMsgs = systemMessagePenddingDao.selectList(null);
+ for (SystemMessagePenddingEntity penddingMsg : penddingMsgs) {
+ try {
+ sendMQMessage(penddingMsg.getMsgType(), penddingMsg.getContent(), penddingMsg.getId());
+ } catch (Exception e) {
+ // 投递失败不应影响后续消息的投递
+ logger.error("【重新投递MQ消息】失败, msgType:{}, penddingMsgId:{}, 错误:{}", penddingMsg.getMsgType(), penddingMsg.getId() , ExceptionUtils.getErrorStackTrace(e));
+ }
+ }
+ }
+
@Data
@NoArgsConstructor
@AllArgsConstructor
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml b/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml
new file mode 100644
index 0000000000..4a5ab00608
--- /dev/null
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ delete from system_message_pendding where ID = #{penddingId}
+
+
+
+
\ No newline at end of file
diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java
index cb7af65139..a9eb9c436f 100644
--- a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java
+++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java
@@ -57,7 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
public void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@@ -84,7 +84,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【议题/项目分类标签初始化事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -98,8 +98,8 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
+ //logger.info("【议题/项目分类标签初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
}
}
\ No newline at end of file
diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java
index 31346a2e8b..11bcc80e38 100644
--- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java
+++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java
@@ -56,7 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@@ -82,7 +82,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【客户初始化事件监听器】-orgRole-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -129,9 +129,9 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【客户初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
+ //logger.info("【客户初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
}
/* @Override
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
index 5a13adb637..df6278488e 100644
--- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
@@ -55,7 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
GridBaseInfoFormDTO obj = JSON.parseObject(msg, GridBaseInfoFormDTO.class);
@@ -87,7 +87,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -101,8 +101,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,pendingMsgLabel:{}", pendingMsgLabel);
+ //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
index a9797b34d8..4e40f0974f 100644
--- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
@@ -58,7 +58,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags);
StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class);
@@ -93,7 +93,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【开放数据事件监听器】-巡查-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -107,7 +107,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
+ //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
index 270c7ff3e3..313c036cb8 100644
--- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
@@ -58,11 +58,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
//messageExt.propert
- logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel);
+ logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel);
StaffBaseInfoFormDTO obj = JSON.parseObject(msg, StaffBaseInfoFormDTO.class);
DistributedLock distributedLock = null;
@@ -87,7 +87,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【开放数据事件监听器】-staff-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -102,7 +102,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
}
}
diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java
index d560979694..5980ad0a2a 100644
--- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java
+++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java
@@ -51,7 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
- String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
+ String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@@ -82,7 +82,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
- logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+ logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@@ -96,9 +96,9 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
- String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
+ String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
- logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
+ //logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
/* @Override