diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java index 4f9e361fc5..ee243d88dd 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java @@ -2,22 +2,19 @@ package com.epmet.mq.listener.listener; import com.alibaba.fastjson.JSON; import com.epmet.auth.constants.AuthOperationEnum; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.LoginMQMsg; -import com.epmet.commons.tools.constant.ServiceConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; -import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; -import com.epmet.commons.tools.feign.ResultDataResolver; -import com.epmet.commons.tools.utils.IpUtils; -import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; -import com.epmet.dto.CustomerStaffDTO; import com.epmet.entity.LogOperationEntity; -import com.epmet.feign.EpmetUserOpenFeignClient; import com.epmet.mq.listener.bean.log.LogOperationHelper; import com.epmet.mq.listener.bean.log.OperatorInfo; import com.epmet.service.LogOperationService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -40,8 +37,15 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -54,6 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("认证操作日志监听器-收到消息内容:{}", msg); LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class); @@ -91,5 +96,27 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java index f953c35edc..d883b7b1f6 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java @@ -1,10 +1,13 @@ package com.epmet.mq.listener.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.PointRuleChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.entity.LogOperationEntity; import com.epmet.enums.SystemMessageTypeEnum; @@ -34,8 +37,15 @@ public class PointOperationLogListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -48,6 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String opeType = messageExt.getTags(); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("积分操作日志监听器-收到消息内容:{}", msg); PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class); @@ -87,5 +98,27 @@ public class PointOperationLogListener implements MessageListenerConcurrently { } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java index 13384ea62e..d968d95448 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java @@ -1,11 +1,14 @@ package com.epmet.mq.listener.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.LoginMQMsg; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.entity.LogOperationEntity; import com.epmet.mq.listener.bean.log.LogOperationHelper; @@ -34,8 +37,15 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -48,6 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { //String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("项目变动操作日志监听器-收到消息内容:{}", msg); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); @@ -87,6 +98,14 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } } private String getOperationTypeDisplay(String type) { @@ -107,4 +126,18 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently return null; } } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); + } } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java similarity index 81% rename from epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java rename to epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java index 593ab0cc57..a69de64cca 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java @@ -1,4 +1,4 @@ -package com.epmet.constant; +package com.epmet.commons.rocketmq.constants; /** * @Description MQ用户自定义属性 diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 9d2497f24e..ad66110c51 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -556,7 +556,7 @@ public class RedisKeys { * @author wxz * @date 2021.10.14 14:33:53 */ - public static String checkPendingMqMsgKey(String pendingMsgLabel) { + public static String pendingMqMsgKey(String pendingMsgLabel) { return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel); } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index f62cfe09f8..932ce9e28f 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -1,8 +1,10 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; @@ -45,12 +47,18 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + long start = System.currentTimeMillis(); try { - List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - for (String msgStr : msgStrs) { - consumeMessage(msgStr); + //List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + //for (String msgStr : msgStrs) { + // consumeMessage(msgStr); + //} + + for (MessageExt msgExt : msgs) { + consumeMessage(msgExt); } + } catch (Exception e) { //失败不重发 logger.error("consumeMessage fail,msg:{}",e.getMessage()); @@ -60,7 +68,10 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } - private void consumeMessage(String msg) { + private void consumeMessage(MessageExt msgExt) { + String msg = new String(msgExt.getBody()); + String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + logger.info("receive customerId:{}", JSON.toJSONString(msg)); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); if (msgObj == null){ @@ -87,6 +98,14 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } else { log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); } + + if (org.apache.commons.lang.StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【项目变动事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } } public void consumeMessage(ProjectChangedMQMsg msgObj) { @@ -151,6 +170,20 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【项目变动事件监听器】删除mq滞留消息缓存失败:{}", pendingMsgLabel); + } + /*@Override public ConsumerConfigProperties getConsumerProperty() { ConsumerConfigProperties configProperties = new ConsumerConfigProperties(); diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java index 002d9ea424..cc353dd449 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java @@ -32,18 +32,9 @@ public class SystemMessageController { return new Result(); } - /** - * @description 应答mq消息 - * - * @param form - * @return - * @author wxz - * @date 2021.10.14 16:07:17 - */ - @PostMapping("ack-mq-msg") - public Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { - ValidatorUtils.validateEntity(form, SystemMsgFormDTO.AckMsgByMQ.class); - systemMessageService.ackMqMessage(form.getPendingMsgLabel()); + @PostMapping("pendding-mq-msg-scan") + public Result penddingMqMsgScan() { + systemMessageService.penddingMqMsgScan(); return new Result(); } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java index 25189683f0..f48e56cf9f 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java @@ -14,13 +14,12 @@ public interface SystemMessageService { void sendMQMessage(String messageType, Object content); /** - * @description 消息应答 + * @description 扫描滞留消息 * - * @param pendingMsgLabel + * @param * @return * @author wxz - * @date 2021.10.14 15:07:09 + * @date 2021.10.15 10:13:37 */ - void ackMqMessage(String pendingMsgLabel); - + void penddingMqMsgScan(); } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index a0add17a03..7bbc426d9e 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -2,13 +2,13 @@ package com.epmet.service.impl; import com.alibaba.fastjson.JSON; import com.epmet.auth.constants.AuthOperationConstants; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; -import com.epmet.constant.MQUserPropertys; import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageType; import com.epmet.dao.SystemMessageDao; @@ -26,7 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; +import java.time.LocalDateTime; +import java.util.Set; import java.util.UUID; @Service @@ -34,6 +35,21 @@ public class SystemMessageServiceImpl implements SystemMessageService { private Logger logger = LoggerFactory.getLogger(getClass()); + // 消息堆积时间阈值,单位s + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1 = 1 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L2 = 2 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L3 = 5 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L4 = 10 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L5 = 30 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6 = 60 * 60; + + // 堆积消息告警间隔时长,单位s + public static final long PENDDING_MQ_MSG_ALERT_SECONDS_DELTA = 60 * 60; + + // 各个级别上次告警时间 + private LocalDateTime l1LastAlertTime; + private LocalDateTime l6LastAlertTime; + @Autowired private SystemMessageDao systemMessageDao; @@ -47,27 +63,35 @@ public class SystemMessageServiceImpl implements SystemMessageService { @Override public void sendMQMessage(String messageType, Object content) { String contentStr = JSON.toJSONString(content); - //存储消息到表 + // 1.存储消息到表 SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); systemMessageEntity.setMsgType(messageType); systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ); systemMessageEntity.setContent(contentStr); systemMessageDao.insert(systemMessageEntity); - // 缓存下来,供滞留消息扫描。TTL -1,永不过期 - MessageCacheBean mcb = new MessageCacheBean(new Date(), messageType, contentStr); - String pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); - String pendingMsgKey = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); - redisUtils.set(pendingMsgKey, mcb, -1); + String topic = getTopicByMsgType(messageType); + + // 2.缓存下来,供滞留消息扫描。TTL -1,永不过期 + String pendingMsgLabel = null; + String pendingMsgKey = null; + try { + MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr); + pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); + pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.set(pendingMsgKey, mcb, -1); + } catch (Exception e) { + logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); + } - //发送mq消息 + // 3.发送mq消息 try { - Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); + Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); rocketMQTemplate.getProducer().send(meMessage); } catch (Exception e) { String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); - logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace); + logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); // 清理消息缓存 redisUtils.delete(pendingMsgKey); @@ -122,18 +146,60 @@ public class SystemMessageServiceImpl implements SystemMessageService { } @Override - public void ackMqMessage(String pendingMsgLabel) { - String key = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); - redisUtils.delete(key); - logger.info("【MQ消息应答】pendingMsgLabel{}", pendingMsgLabel); + public void penddingMqMsgScan() { + String scanKey = RedisKeys.pendingMqMsgKey("*"); + Set keys = redisUtils.keys(scanKey); + //System.out.println(keys); + for (String key : keys) { + MessageCacheBean mcb = (MessageCacheBean) redisUtils.get(key); + + LocalDateTime createTime = mcb.getCreateTime(); + LocalDateTime now = LocalDateTime.now(); + //long deltaSeconds = ChronoUnit.SECONDS.between(createTime, now); + + // 此处暂时使用粗粒度的Topic判断,耕细粒度的应该使用SystemMessageType + switch (mcb.getTopic()) { + case TopicConstants.AUTH: + case TopicConstants.GROUP_ACHIEVEMENT: + case TopicConstants.INIT_CUSTOMER: + case TopicConstants.ORG: + case TopicConstants.PATROL: + case TopicConstants.POINT: + case TopicConstants.STAFF: + + // 耗时较短。一个小时最多发送一次告警 + if (l1LastAlertTime == null || ( + createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1).isBefore(now) + && l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) + ) { + + logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + l1LastAlertTime = now; + + } + break; + + case TopicConstants.PROJECT_CHANGED: + // 耗时较长,一个小时最多发送一次告警 + if (l1LastAlertTime == null || ( + createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now) + && l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) + ) { + logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + l1LastAlertTime = now; + } + break; + } + } } @Data @NoArgsConstructor @AllArgsConstructor - class MessageCacheBean { - private Date createTime; + static class MessageCacheBean { + private LocalDateTime createTime; private String messageType; + private String topic; private Object content; } } diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java index 19ca52066c..3554e961da 100644 --- a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java +++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java @@ -1,12 +1,17 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.form.CategoryTagInitFormDTO; import com.epmet.service.IssueProjectCategoryDictService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -32,8 +37,15 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu @Autowired private DistributedLock distributedLock; + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -45,6 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu public void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -66,5 +79,27 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存失败:{}", pendingMsgLabel); } } \ No newline at end of file diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java index 7224ea757a..e554e36f74 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java @@ -1,16 +1,20 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.constant.UserWorkType; import com.epmet.dto.CustomerAgencyDTO; import com.epmet.dto.form.AddAgencyAndStaffFormDTO; import com.epmet.dto.form.AdminStaffFromDTO; import com.epmet.service.AgencyService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -32,8 +36,15 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -45,6 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -65,6 +77,14 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } } /** @@ -100,6 +120,20 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently return agencyAndStaff; } + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【客户初始化事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); + } + /* @Override public ConsumerConfigProperties getConsumerProperty() { ConsumerConfigProperties configProperties = new ConsumerConfigProperties(); diff --git a/epmet-module/open-data-worker/open-data-worker-server/pom.xml b/epmet-module/open-data-worker/open-data-worker-server/pom.xml index 211bef65a3..321234a904 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/pom.xml +++ b/epmet-module/open-data-worker/open-data-worker-server/pom.xml @@ -172,7 +172,7 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd - local + false 192.168.1.140:9876;192.168.1.141:9876 diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java index 467f690259..8365eaa9ba 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -1,13 +1,12 @@ package com.epmet.opendata.mq.listener; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; -import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; -import com.epmet.constant.MQUserPropertys; -import com.epmet.dto.form.SystemMsgFormDTO; -import com.epmet.feign.EpmetMessageOpenFeignClient; import org.apache.commons.lang.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -28,17 +27,13 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent private Logger logger = LoggerFactory.getLogger(getClass()); - private EpmetMessageOpenFeignClient messageOpenFeignClient; + private RedisUtils redisUtils; @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - if (messageOpenFeignClient == null) { - try { - messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class); - } catch (Exception e) { - e.printStackTrace(); - } + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); } try { @@ -54,6 +49,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); @@ -76,30 +72,26 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent //distributedLock.unLock(lock); } - // 应答mq消息(调用message服务的应答api) if (StringUtils.isNotBlank(pendingMsgLabel)) { try { - ackMqMsg(pendingMsgLabel); + removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } /** - * @description 应答mq消息 + * @description * * @param pendingMsgLabel * @return * @author wxz * @date 2021.10.14 16:32:32 */ - private void ackMqMsg(String pendingMsgLabel) { - SystemMsgFormDTO form = new SystemMsgFormDTO(); - form.setPendingMsgLabel(pendingMsgLabel); - Result result = messageOpenFeignClient.ackSystemMsgByMQ(form); - if (!result.success()) { - logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg()); - } + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【开放数据事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java index c8251c80fa..9fe2e34eed 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java @@ -1,8 +1,13 @@ package com.epmet.opendata.mq.listener; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -22,8 +27,15 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -37,7 +49,9 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags); @@ -57,5 +71,26 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr } finally { //distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java index 320b78f7ae..a6e853ba0f 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -1,12 +1,12 @@ package com.epmet.opendata.mq.listener; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; -import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; -import com.epmet.constant.MQUserPropertys; -import com.epmet.dto.form.SystemMsgFormDTO; import com.epmet.feign.EpmetMessageOpenFeignClient; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -17,7 +17,6 @@ import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.PostConstruct; import java.util.List; /** @@ -31,11 +30,13 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre private EpmetMessageOpenFeignClient messageOpenFeignClient; + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - if (messageOpenFeignClient == null) { - messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class); + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); } try { @@ -51,6 +52,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); @@ -76,30 +78,25 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre //distributedLock.unLock(lock); } - // 应答mq消息(调用message服务的应答api) if (StringUtils.isNotBlank(pendingMsgLabel)) { try { - ackMqMsg(pendingMsgLabel); + removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } /** - * @description 应答mq消息 + * @description * * @param pendingMsgLabel * @return * @author wxz * @date 2021.10.14 16:32:32 */ - private void ackMqMsg(String pendingMsgLabel) { - SystemMsgFormDTO form = new SystemMsgFormDTO(); - form.setPendingMsgLabel(pendingMsgLabel); - Result result = messageOpenFeignClient.ackSystemMsgByMQ(form); - if (!result.success()) { - logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg()); - } + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); } } diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java index d21ea39357..b6c91d276d 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java @@ -1,13 +1,17 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.CustomerHomeDTO; import com.epmet.service.CustomerHomeService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -15,7 +19,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.TimeUnit; @@ -28,8 +31,15 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -41,6 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -66,6 +77,28 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【开放数据事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); } /* @Override diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java index 33c0ff7c63..8996de4f47 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java @@ -1,9 +1,13 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.modules.group.service.StatsAchievementService; import lombok.extern.slf4j.Slf4j; @@ -31,8 +35,15 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + long start = System.currentTimeMillis(); try { msgs.forEach(this::consumeMessage); @@ -48,6 +59,7 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren private void consumeMessage(MessageExt messageExt) { logger.info("receive msg:{}", JSON.toJSONString(messageExt)); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); GroupAchievementMQMsg msgObj = JSON.parseObject(msg, GroupAchievementMQMsg.class); if (msgObj == null){ @@ -87,9 +99,29 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren distributedLock.unLock(lock); } } - } + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【小组成就事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【小组成就事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); + } /*@Override public ConsumerConfigProperties getConsumerProperty() {