Browse Source

修改:

1.所有mq listener增加redis 滞留消息列表的删除动作
dev_shibei_match
wxz 4 years ago
parent
commit
57b3679f36
  1. 41
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java
  2. 33
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java
  3. 33
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java
  4. 2
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java
  5. 2
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  6. 41
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
  7. 15
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
  8. 9
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
  9. 100
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  10. 35
      epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java
  11. 34
      epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java
  12. 2
      epmet-module/open-data-worker/open-data-worker-server/pom.xml
  13. 36
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
  14. 35
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
  15. 31
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
  16. 35
      epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java
  17. 34
      epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java

41
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java

@ -2,22 +2,19 @@ package com.epmet.mq.listener.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.auth.constants.AuthOperationEnum;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.LoginMQMsg;
import com.epmet.commons.tools.constant.ServiceConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.EpmetErrorCode;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.feign.ResultDataResolver;
import com.epmet.commons.tools.utils.IpUtils;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.CustomerStaffDTO;
import com.epmet.entity.LogOperationEntity;
import com.epmet.feign.EpmetUserOpenFeignClient;
import com.epmet.mq.listener.bean.log.LogOperationHelper;
import com.epmet.mq.listener.bean.log.OperatorInfo;
import com.epmet.service.LogOperationService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -40,8 +37,15 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -54,6 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) {
String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("认证操作日志监听器-收到消息内容:{}", msg);
LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class);
@ -91,5 +96,27 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel);
}
}

33
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java

@ -1,10 +1,13 @@
package com.epmet.mq.listener.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.PointRuleChangedMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.entity.LogOperationEntity;
import com.epmet.enums.SystemMessageTypeEnum;
@ -34,8 +37,15 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -48,6 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) {
String opeType = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("积分操作日志监听器-收到消息内容:{}", msg);
PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class);
@ -87,5 +98,27 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
}
}

33
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java

@ -1,11 +1,14 @@
package com.epmet.mq.listener.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.LoginMQMsg;
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.entity.LogOperationEntity;
import com.epmet.mq.listener.bean.log.LogOperationHelper;
@ -34,8 +37,15 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -48,6 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) {
//String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("项目变动操作日志监听器-收到消息内容:{}", msg);
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@ -87,6 +98,14 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
private String getOperationTypeDisplay(String type) {
@ -107,4 +126,18 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
return null;
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
}
}

2
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java → epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java

@ -1,4 +1,4 @@
package com.epmet.constant;
package com.epmet.commons.rocketmq.constants;
/**
* @Description MQ用户自定义属性

2
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -556,7 +556,7 @@ public class RedisKeys {
* @author wxz
* @date 2021.10.14 14:33:53
*/
public static String checkPendingMqMsgKey(String pendingMsgLabel) {
public static String pendingMqMsgKey(String pendingMsgLabel) {
return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel);
}
}

41
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -1,8 +1,10 @@
package com.epmet.mq;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
@ -45,12 +47,18 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long start = System.currentTimeMillis();
try {
List<String> msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
for (String msgStr : msgStrs) {
consumeMessage(msgStr);
//List<String> msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
//for (String msgStr : msgStrs) {
// consumeMessage(msgStr);
//}
for (MessageExt msgExt : msgs) {
consumeMessage(msgExt);
}
} catch (Exception e) {
//失败不重发
logger.error("consumeMessage fail,msg:{}",e.getMessage());
@ -60,7 +68,10 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(String msg) {
private void consumeMessage(MessageExt msgExt) {
String msg = new String(msgExt.getBody());
String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("receive customerId:{}", JSON.toJSONString(msg));
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
if (msgObj == null){
@ -87,6 +98,14 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
} else {
log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId());
}
if (org.apache.commons.lang.StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【项目变动事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
public void consumeMessage(ProjectChangedMQMsg msgObj) {
@ -151,6 +170,20 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【项目变动事件监听器】删除mq滞留消息缓存失败:{}", pendingMsgLabel);
}
/*@Override
public ConsumerConfigProperties getConsumerProperty() {
ConsumerConfigProperties configProperties = new ConsumerConfigProperties();

15
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java

@ -32,18 +32,9 @@ public class SystemMessageController {
return new Result();
}
/**
* @description 应答mq消息
*
* @param form
* @return
* @author wxz
* @date 2021.10.14 16:07:17
*/
@PostMapping("ack-mq-msg")
public Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) {
ValidatorUtils.validateEntity(form, SystemMsgFormDTO.AckMsgByMQ.class);
systemMessageService.ackMqMessage(form.getPendingMsgLabel());
@PostMapping("pendding-mq-msg-scan")
public Result penddingMqMsgScan() {
systemMessageService.penddingMqMsgScan();
return new Result();
}

9
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java

@ -14,13 +14,12 @@ public interface SystemMessageService {
void sendMQMessage(String messageType, Object content);
/**
* @description 消息应答
* @description 扫描滞留消息
*
* @param pendingMsgLabel
* @param
* @return
* @author wxz
* @date 2021.10.14 15:07:09
* @date 2021.10.15 10:13:37
*/
void ackMqMessage(String pendingMsgLabel);
void penddingMqMsgScan();
}

100
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -2,13 +2,13 @@ package com.epmet.service.impl;
import com.alibaba.fastjson.JSON;
import com.epmet.auth.constants.AuthOperationConstants;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.tools.exception.EpmetErrorCode;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.MQUserPropertys;
import com.epmet.constant.SystemMessageSendApproach;
import com.epmet.constant.SystemMessageType;
import com.epmet.dao.SystemMessageDao;
@ -26,7 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.UUID;
@Service
@ -34,6 +35,21 @@ public class SystemMessageServiceImpl implements SystemMessageService {
private Logger logger = LoggerFactory.getLogger(getClass());
// 消息堆积时间阈值,单位s
private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1 = 1 * 60;
private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L2 = 2 * 60;
private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L3 = 5 * 60;
private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L4 = 10 * 60;
private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L5 = 30 * 60;
private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6 = 60 * 60;
// 堆积消息告警间隔时长,单位s
public static final long PENDDING_MQ_MSG_ALERT_SECONDS_DELTA = 60 * 60;
// 各个级别上次告警时间
private LocalDateTime l1LastAlertTime;
private LocalDateTime l6LastAlertTime;
@Autowired
private SystemMessageDao systemMessageDao;
@ -47,27 +63,35 @@ public class SystemMessageServiceImpl implements SystemMessageService {
@Override
public void sendMQMessage(String messageType, Object content) {
String contentStr = JSON.toJSONString(content);
//存储消息到表
// 1.存储消息到表
SystemMessageEntity systemMessageEntity = new SystemMessageEntity();
systemMessageEntity.setMsgType(messageType);
systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ);
systemMessageEntity.setContent(contentStr);
systemMessageDao.insert(systemMessageEntity);
// 缓存下来,供滞留消息扫描。TTL -1,永不过期
MessageCacheBean mcb = new MessageCacheBean(new Date(), messageType, contentStr);
String pendingMsgLabel = UUID.randomUUID().toString().replace("-", "");
String pendingMsgKey = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel);
redisUtils.set(pendingMsgKey, mcb, -1);
String topic = getTopicByMsgType(messageType);
// 2.缓存下来,供滞留消息扫描。TTL -1,永不过期
String pendingMsgLabel = null;
String pendingMsgKey = null;
try {
MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr);
pendingMsgLabel = UUID.randomUUID().toString().replace("-", "");
pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.set(pendingMsgKey, mcb, -1);
} catch (Exception e) {
logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e));
}
//发送mq消息
// 3.发送mq消息
try {
Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel);
rocketMQTemplate.getProducer().send(meMessage);
} catch (Exception e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace);
logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace);
// 清理消息缓存
redisUtils.delete(pendingMsgKey);
@ -122,18 +146,60 @@ public class SystemMessageServiceImpl implements SystemMessageService {
}
@Override
public void ackMqMessage(String pendingMsgLabel) {
String key = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【MQ消息应答】pendingMsgLabel{}", pendingMsgLabel);
public void penddingMqMsgScan() {
String scanKey = RedisKeys.pendingMqMsgKey("*");
Set<String> keys = redisUtils.keys(scanKey);
//System.out.println(keys);
for (String key : keys) {
MessageCacheBean mcb = (MessageCacheBean) redisUtils.get(key);
LocalDateTime createTime = mcb.getCreateTime();
LocalDateTime now = LocalDateTime.now();
//long deltaSeconds = ChronoUnit.SECONDS.between(createTime, now);
// 此处暂时使用粗粒度的Topic判断,耕细粒度的应该使用SystemMessageType
switch (mcb.getTopic()) {
case TopicConstants.AUTH:
case TopicConstants.GROUP_ACHIEVEMENT:
case TopicConstants.INIT_CUSTOMER:
case TopicConstants.ORG:
case TopicConstants.PATROL:
case TopicConstants.POINT:
case TopicConstants.STAFF:
// 耗时较短。一个小时最多发送一次告警
if (l1LastAlertTime == null || (
createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1).isBefore(now)
&& l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) {
logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now;
}
break;
case TopicConstants.PROJECT_CHANGED:
// 耗时较长,一个小时最多发送一次告警
if (l1LastAlertTime == null || (
createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now)
&& l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) {
logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now;
}
break;
}
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class MessageCacheBean {
private Date createTime;
static class MessageCacheBean {
private LocalDateTime createTime;
private String messageType;
private String topic;
private Object content;
}
}

35
epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java

@ -1,12 +1,17 @@
package com.epmet.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.form.CategoryTagInitFormDTO;
import com.epmet.service.IssueProjectCategoryDictService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -32,8 +37,15 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
@Autowired
private DistributedLock distributedLock;
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -45,6 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
public void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -66,5 +79,27 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存失败:{}", pendingMsgLabel);
}
}

34
epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java

@ -1,16 +1,20 @@
package com.epmet.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.UserWorkType;
import com.epmet.dto.CustomerAgencyDTO;
import com.epmet.dto.form.AddAgencyAndStaffFormDTO;
import com.epmet.dto.form.AdminStaffFromDTO;
import com.epmet.service.AgencyService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -32,8 +36,15 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -45,6 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -65,6 +77,14 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
@ -100,6 +120,20 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
return agencyAndStaff;
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【客户初始化事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel);
}
/* @Override
public ConsumerConfigProperties getConsumerProperty() {
ConsumerConfigProperties configProperties = new ConsumerConfigProperties();

2
epmet-module/open-data-worker/open-data-worker-server/pom.xml

@ -172,7 +172,7 @@
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>local</rocketmq.enable>
<rocketmq.enable>false</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties>
</profile>

36
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java

@ -1,13 +1,12 @@
package com.epmet.opendata.mq.listener;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.MQUserPropertys;
import com.epmet.dto.form.SystemMsgFormDTO;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@ -28,17 +27,13 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
private Logger logger = LoggerFactory.getLogger(getClass());
private EpmetMessageOpenFeignClient messageOpenFeignClient;
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (messageOpenFeignClient == null) {
try {
messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class);
} catch (Exception e) {
e.printStackTrace();
}
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
@ -54,6 +49,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
@ -76,30 +72,26 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
//distributedLock.unLock(lock);
}
// 应答mq消息(调用message服务的应答api)
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
ackMqMsg(pendingMsgLabel);
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description 应答mq消息
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void ackMqMsg(String pendingMsgLabel) {
SystemMsgFormDTO form = new SystemMsgFormDTO();
form.setPendingMsgLabel(pendingMsgLabel);
Result result = messageOpenFeignClient.ackSystemMsgByMQ(form);
if (!result.success()) {
logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg());
}
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【开放数据事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel);
}
}

35
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java

@ -1,8 +1,13 @@
package com.epmet.opendata.mq.listener;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -22,8 +27,15 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -37,7 +49,9 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags);
@ -57,5 +71,26 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
} finally {
//distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
}
}

31
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

@ -1,12 +1,12 @@
package com.epmet.opendata.mq.listener;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.MQUserPropertys;
import com.epmet.dto.form.SystemMsgFormDTO;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@ -17,7 +17,6 @@ import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import java.util.List;
/**
@ -31,11 +30,13 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
private EpmetMessageOpenFeignClient messageOpenFeignClient;
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (messageOpenFeignClient == null) {
messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class);
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
@ -51,6 +52,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
@ -76,30 +78,25 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
//distributedLock.unLock(lock);
}
// 应答mq消息(调用message服务的应答api)
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
ackMqMsg(pendingMsgLabel);
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description 应答mq消息
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void ackMqMsg(String pendingMsgLabel) {
SystemMsgFormDTO form = new SystemMsgFormDTO();
form.setPendingMsgLabel(pendingMsgLabel);
Result result = messageOpenFeignClient.ackSystemMsgByMQ(form);
if (!result.success()) {
logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg());
}
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
}
}

35
epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java

@ -1,13 +1,17 @@
package com.epmet.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.CustomerHomeDTO;
import com.epmet.service.CustomerHomeService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -15,7 +19,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -28,8 +31,15 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
@ -41,6 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -66,6 +77,28 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【开放数据事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel);
}
/* @Override

34
epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java

@ -1,9 +1,13 @@
package com.epmet.mq;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.modules.group.service.StatsAchievementService;
import lombok.extern.slf4j.Slf4j;
@ -31,8 +35,15 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
long start = System.currentTimeMillis();
try {
msgs.forEach(this::consumeMessage);
@ -48,6 +59,7 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren
private void consumeMessage(MessageExt messageExt) {
logger.info("receive msg:{}", JSON.toJSONString(messageExt));
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
GroupAchievementMQMsg msgObj = JSON.parseObject(msg, GroupAchievementMQMsg.class);
if (msgObj == null){
@ -87,9 +99,29 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren
distributedLock.unLock(lock);
}
}
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【小组成就事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【小组成就事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel);
}
/*@Override
public ConsumerConfigProperties getConsumerProperty() {

Loading…
Cancel
Save