|  | @ -2,13 +2,13 @@ package com.epmet.service.impl; | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  | import com.alibaba.fastjson.JSON; |  |  | import com.alibaba.fastjson.JSON; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.auth.constants.AuthOperationConstants; |  |  | import com.epmet.auth.constants.AuthOperationConstants; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | import com.epmet.commons.rocketmq.constants.MQUserPropertys; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.rocketmq.constants.TopicConstants; |  |  | import com.epmet.commons.rocketmq.constants.TopicConstants; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.exception.EpmetErrorCode; |  |  | import com.epmet.commons.tools.exception.EpmetErrorCode; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.exception.ExceptionUtils; |  |  | import com.epmet.commons.tools.exception.ExceptionUtils; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.exception.RenException; |  |  | import com.epmet.commons.tools.exception.RenException; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.redis.RedisKeys; |  |  | import com.epmet.commons.tools.redis.RedisKeys; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.redis.RedisUtils; |  |  | import com.epmet.commons.tools.redis.RedisUtils; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.constant.MQUserPropertys; |  |  |  | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.constant.SystemMessageSendApproach; |  |  | import com.epmet.constant.SystemMessageSendApproach; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.constant.SystemMessageType; |  |  | import com.epmet.constant.SystemMessageType; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.dao.SystemMessageDao; |  |  | import com.epmet.dao.SystemMessageDao; | 
			
		
	
	
		
		
			
				
					|  | @ -26,7 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired; | 
			
		
	
		
		
			
				
					|  |  | import org.springframework.stereotype.Service; |  |  | import org.springframework.stereotype.Service; | 
			
		
	
		
		
			
				
					|  |  | import org.springframework.transaction.annotation.Transactional; |  |  | import org.springframework.transaction.annotation.Transactional; | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  | import java.util.Date; |  |  | import java.time.LocalDateTime; | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					|  |  |  |  |  | import java.util.Set; | 
			
		
	
		
		
			
				
					|  |  | import java.util.UUID; |  |  | import java.util.UUID; | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  | @Service |  |  | @Service | 
			
		
	
	
		
		
			
				
					|  | @ -34,6 +35,21 @@ public class SystemMessageServiceImpl implements SystemMessageService { | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |     private Logger logger = LoggerFactory.getLogger(getClass()); |  |  |     private Logger logger = LoggerFactory.getLogger(getClass()); | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     // 消息堆积时间阈值,单位s
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1 = 1 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L2 = 2 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L3 = 5 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L4 = 10 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L5 = 30 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6 = 60 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     // 堆积消息告警间隔时长,单位s
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     public static final long PENDDING_MQ_MSG_ALERT_SECONDS_DELTA = 60 * 60; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     // 各个级别上次告警时间
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private LocalDateTime l1LastAlertTime; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private LocalDateTime l6LastAlertTime; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |     @Autowired |  |  |     @Autowired | 
			
		
	
		
		
			
				
					|  |  |     private SystemMessageDao systemMessageDao; |  |  |     private SystemMessageDao systemMessageDao; | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  | @ -47,27 +63,35 @@ public class SystemMessageServiceImpl implements SystemMessageService { | 
			
		
	
		
		
			
				
					|  |  |     @Override |  |  |     @Override | 
			
		
	
		
		
			
				
					|  |  |     public void sendMQMessage(String messageType, Object content) { |  |  |     public void sendMQMessage(String messageType, Object content) { | 
			
		
	
		
		
			
				
					|  |  |         String contentStr = JSON.toJSONString(content); |  |  |         String contentStr = JSON.toJSONString(content); | 
			
		
	
		
		
			
				
					
					|  |  |         //存储消息到表
 |  |  |         // 1.存储消息到表
 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					|  |  |         SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); |  |  |         SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); | 
			
		
	
		
		
			
				
					|  |  |         systemMessageEntity.setMsgType(messageType); |  |  |         systemMessageEntity.setMsgType(messageType); | 
			
		
	
		
		
			
				
					|  |  |         systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ); |  |  |         systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ); | 
			
		
	
		
		
			
				
					|  |  |         systemMessageEntity.setContent(contentStr); |  |  |         systemMessageEntity.setContent(contentStr); | 
			
		
	
		
		
			
				
					|  |  |         systemMessageDao.insert(systemMessageEntity); |  |  |         systemMessageDao.insert(systemMessageEntity); | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |         // 缓存下来,供滞留消息扫描。TTL -1,永不过期
 |  |  |         String topic = getTopicByMsgType(messageType); | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         MessageCacheBean mcb = new MessageCacheBean(new Date(), messageType, contentStr); |  |  | 
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         String pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); |  |  |         // 2.缓存下来,供滞留消息扫描。TTL -1,永不过期
 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         String pendingMsgKey = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); |  |  |         String pendingMsgLabel = null; | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         redisUtils.set(pendingMsgKey, mcb, -1); |  |  |         String pendingMsgKey = null; | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					|  |  |  |  |  |         try { | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             redisUtils.set(pendingMsgKey, mcb, -1); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |         } catch (Exception e) { | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |         } | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |         //发送mq消息
 |  |  |         // 3.发送mq消息
 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					|  |  |         try { |  |  |         try { | 
			
		
	
		
		
			
				
					
					|  |  |             Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); |  |  |             Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					|  |  |             meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); |  |  |             meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); | 
			
		
	
		
		
			
				
					|  |  |             rocketMQTemplate.getProducer().send(meMessage); |  |  |             rocketMQTemplate.getProducer().send(meMessage); | 
			
		
	
		
		
			
				
					|  |  |         } catch (Exception e) { |  |  |         } catch (Exception e) { | 
			
		
	
		
		
			
				
					|  |  |             String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |  |  |             String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); | 
			
		
	
		
		
			
				
					
					|  |  |             logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace); |  |  |             logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |             // 清理消息缓存
 |  |  |             // 清理消息缓存
 | 
			
		
	
		
		
			
				
					|  |  |             redisUtils.delete(pendingMsgKey); |  |  |             redisUtils.delete(pendingMsgKey); | 
			
		
	
	
		
		
			
				
					|  | @ -122,18 +146,60 @@ public class SystemMessageServiceImpl implements SystemMessageService { | 
			
		
	
		
		
			
				
					|  |  |     } |  |  |     } | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |     @Override |  |  |     @Override | 
			
		
	
		
		
			
				
					
					|  |  |     public void ackMqMessage(String pendingMsgLabel) { |  |  |     public void penddingMqMsgScan() { | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         String key = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); |  |  |         String scanKey = RedisKeys.pendingMqMsgKey("*"); | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         redisUtils.delete(key); |  |  |         Set<String> keys = redisUtils.keys(scanKey); | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         logger.info("【MQ消息应答】pendingMsgLabel{}", pendingMsgLabel); |  |  |         //System.out.println(keys);
 | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					|  |  |  |  |  |         for (String key : keys) { | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             MessageCacheBean mcb = (MessageCacheBean) redisUtils.get(key); | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             LocalDateTime createTime = mcb.getCreateTime(); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             LocalDateTime now = LocalDateTime.now(); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             //long deltaSeconds = ChronoUnit.SECONDS.between(createTime, now);
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             // 此处暂时使用粗粒度的Topic判断,耕细粒度的应该使用SystemMessageType
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             switch (mcb.getTopic()) { | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.AUTH: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.GROUP_ACHIEVEMENT: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.INIT_CUSTOMER: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.ORG: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.PATROL: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.POINT: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.STAFF: | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     // 耗时较短。一个小时最多发送一次告警
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     if (l1LastAlertTime == null || ( | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                             createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1).isBefore(now) | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                                     && l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     ) { | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                         logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                         l1LastAlertTime = now; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     } | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     break; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 case TopicConstants.PROJECT_CHANGED: | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     // 耗时较长,一个小时最多发送一次告警
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     if (l1LastAlertTime == null || ( | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                             createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now) | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                                     && l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     ) { | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                         logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                         l1LastAlertTime = now; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     } | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     break; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             } | 
			
		
	
		
		
			
				
					|  |  |  |  |  |         } | 
			
		
	
		
		
			
				
					|  |  |     } |  |  |     } | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |     @Data |  |  |     @Data | 
			
		
	
		
		
			
				
					|  |  |     @NoArgsConstructor |  |  |     @NoArgsConstructor | 
			
		
	
		
		
			
				
					|  |  |     @AllArgsConstructor |  |  |     @AllArgsConstructor | 
			
		
	
		
		
			
				
					
					|  |  |     class MessageCacheBean { |  |  |     static class MessageCacheBean { | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |         private Date createTime; |  |  |         private LocalDateTime createTime; | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					|  |  |         private String messageType; |  |  |         private String messageType; | 
			
		
	
		
		
			
				
					|  |  |  |  |  |         private String topic; | 
			
		
	
		
		
			
				
					|  |  |         private Object content; |  |  |         private Object content; | 
			
		
	
		
		
			
				
					|  |  |     } |  |  |     } | 
			
		
	
		
		
			
				
					|  |  | } |  |  | } | 
			
		
	
	
		
		
			
				
					|  | 
 |