Browse Source

新增:

1.epmet-message中,扫描"未成功发送到mq的滞留消息"的接口
2.调整mq阻塞消息的key
3.其他逻辑微调
master
wxz 4 years ago
parent
commit
d3f10217fe
  1. 8
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java
  2. 8
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java
  3. 8
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java
  4. 4
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java
  5. 8
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  6. 6
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
  7. 24
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
  8. 35
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java
  9. 61
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java
  10. 16
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
  11. 97
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  12. 26
      epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml
  13. 8
      epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java
  14. 8
      epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java
  15. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
  16. 7
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
  17. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
  18. 8
      epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java

8
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java

@ -58,7 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) { private void consumeMessage(MessageExt messageExt) {
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("认证操作日志监听器-收到消息内容:{}", msg); logger.info("认证操作日志监听器-收到消息内容:{}", msg);
LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class); LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class);
@ -101,7 +101,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -115,8 +115,8 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); //logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel);
} }
} }

8
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java

@ -58,7 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) { private void consumeMessage(MessageExt messageExt) {
String opeType = messageExt.getTags(); String opeType = messageExt.getTags();
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("积分操作日志监听器-收到消息内容:{}", msg); logger.info("积分操作日志监听器-收到消息内容:{}", msg);
PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class); PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class);
@ -103,7 +103,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【积分操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -117,8 +117,8 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); //logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
} }
} }

8
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java

@ -58,7 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) { private void consumeMessage(MessageExt messageExt) {
//String tags = messageExt.getTags(); //String tags = messageExt.getTags();
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("项目变动操作日志监听器-收到消息内容:{}", msg); logger.info("项目变动操作日志监听器-收到消息内容:{}", msg);
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@ -103,7 +103,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【项目操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -136,8 +136,8 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); //logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
} }
} }

4
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java

@ -7,7 +7,7 @@ package com.epmet.commons.rocketmq.constants;
*/ */
public interface MQUserPropertys { public interface MQUserPropertys {
//堆积消息label //阻塞消息label
String PENDING_MSG_LABEL = "pendingMsgLabel"; String BLOCKED_MSG_LABEL = "blockedMsgLabel";
} }

8
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -549,14 +549,14 @@ public class RedisKeys {
} }
/** /**
* @description 检查message MQ滞留消息 * @description 检查message MQ阻塞消息
* *
* @param pendingMsgLabel 滞留消息的label * @param blockedMsgLabel 滞留消息的label
* @return * @return
* @author wxz * @author wxz
* @date 2021.10.14 14:33:53 * @date 2021.10.14 14:33:53
*/ */
public static String pendingMqMsgKey(String pendingMsgLabel) { public static String blockedMqMsgKey(String blockedMsgLabel) {
return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel); return rootPrefix.concat("message:mq:blocked:").concat(blockedMsgLabel);
} }
} }

6
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -70,7 +70,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt msgExt) { private void consumeMessage(MessageExt msgExt) {
String msg = new String(msgExt.getBody()); String msg = new String(msgExt.getBody());
String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("receive customerId:{}", JSON.toJSONString(msg)); logger.info("receive customerId:{}", JSON.toJSONString(msg));
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@ -179,9 +179,9 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【项目变动事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); //logger.info("【项目变动事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
} }
/*@Override /*@Override

24
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java

@ -28,10 +28,32 @@ public class SystemMessageController {
@PostMapping("send-by-mq") @PostMapping("send-by-mq")
public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) {
ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class); ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class);
systemMessageService.sendMQMessage(form.getMessageType(), form.getContent()); systemMessageService.persistAndSendMQMessage(form.getMessageType(), form.getContent());
return new Result(); return new Result();
} }
/**
* @description 检查MQ阻塞消息(MQ收到消息但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息)
*
* @param
* @return
* @author wxz
* @date 2021.10.16 22:07:38
*/
@PostMapping("blocked-mq-msg-scan")
public Result blockedMqMsgScan() {
systemMessageService.blockedMqMsgScan();
return new Result();
}
/**
* @description 检查MQ滞留消息(往MQ发送消息失败MQ未收到消息)
*
* @param
* @return
* @author wxz
* @date 2021.10.16 22:08:32
*/
@PostMapping("pendding-mq-msg-scan") @PostMapping("pendding-mq-msg-scan")
public Result penddingMqMsgScan() { public Result penddingMqMsgScan() {
systemMessageService.penddingMqMsgScan(); systemMessageService.penddingMqMsgScan();

35
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java

@ -0,0 +1,35 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.dao;
import com.epmet.commons.mybatis.dao.BaseDao;
import com.epmet.entity.SystemMessagePenddingEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* 系统消息表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-16
*/
@Mapper
public interface SystemMessagePenddingDao extends BaseDao<SystemMessagePenddingEntity> {
void physicalDeleteById(@Param("penddingId") String penddingId);
}

61
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java

@ -0,0 +1,61 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.epmet.commons.mybatis.entity.BaseEpmetEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* 系统消息表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-16
*/
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("system_message_pendding")
public class SystemMessagePenddingEntity extends BaseEpmetEntity {
private static final long serialVersionUID = 1L;
/**
* 消息类型init_customer:客户初始化,login登录logout退出
*/
private String msgType;
/**
* 消息主表id
*/
private String msgId;
/**
* 消息发送途径
*/
private String sendApproach;
/**
* 消息内容
*/
private String content;
}

16
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java

@ -3,7 +3,7 @@ package com.epmet.service;
public interface SystemMessageService { public interface SystemMessageService {
/** /**
* @description 发送mq消息 * @description 持久化和发送mq消息
* *
* @param messageType * @param messageType
* @param content * @param content
@ -11,15 +11,25 @@ public interface SystemMessageService {
* @author wxz * @author wxz
* @date 2021.10.14 15:07:02 * @date 2021.10.14 15:07:02
*/ */
void sendMQMessage(String messageType, Object content); void persistAndSendMQMessage(String messageType, Object content);
/** /**
* @description 扫描滞留消息 * @description 扫描阻塞的消息
* *
* @param * @param
* @return * @return
* @author wxz * @author wxz
* @date 2021.10.15 10:13:37 * @date 2021.10.15 10:13:37
*/ */
void blockedMqMsgScan();
/**
* @description 扫描待办的消息(因为发送到MQ失败而堆积)
*
* @param
* @return
* @author wxz
* @date 2021.10.16 12:11:39
*/
void penddingMqMsgScan(); void penddingMqMsgScan();
} }

97
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -12,7 +12,9 @@ import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageSendApproach;
import com.epmet.constant.SystemMessageType; import com.epmet.constant.SystemMessageType;
import com.epmet.dao.SystemMessageDao; import com.epmet.dao.SystemMessageDao;
import com.epmet.dao.SystemMessagePenddingDao;
import com.epmet.entity.SystemMessageEntity; import com.epmet.entity.SystemMessageEntity;
import com.epmet.entity.SystemMessagePenddingEntity;
import com.epmet.service.SystemMessageService; import com.epmet.service.SystemMessageService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@ -27,6 +29,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -53,51 +56,90 @@ public class SystemMessageServiceImpl implements SystemMessageService {
@Autowired @Autowired
private SystemMessageDao systemMessageDao; private SystemMessageDao systemMessageDao;
@Autowired
private SystemMessagePenddingDao systemMessagePenddingDao;
@Autowired @Autowired
private RocketMQTemplate rocketMQTemplate; private RocketMQTemplate rocketMQTemplate;
@Autowired @Autowired
private RedisUtils redisUtils; private RedisUtils redisUtils;
@Transactional(rollbackFor = Exception.class)
@Override @Override
public void sendMQMessage(String messageType, Object content) { public void persistAndSendMQMessage(String messageType, Object content) {
String contentStr = JSON.toJSONString(content); String contentStr = JSON.toJSONString(content);
// 1.存储消息到表 logger.info("【发送MQ系统消息】-落盘并发送-messageType:{}, contentStr:{}", messageType, contentStr);
SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); // 1.消息落盘
systemMessageEntity.setMsgType(messageType); String penddingMsgId = persistMessage(messageType, contentStr);
systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ);
systemMessageEntity.setContent(contentStr); // 2.发送消息
systemMessageDao.insert(systemMessageEntity); sendMQMessage(messageType, contentStr, penddingMsgId);
}
/**
* @description 消息落盘有事务
*
* @param messageType
* @param contentStr
* @return
* @author wxz
* @date 2021.10.16 11:04:53
*/
@Transactional(rollbackFor = Exception.class)
public String persistMessage(String messageType, String contentStr) {
SystemMessageEntity message = new SystemMessageEntity();
message.setMsgType(messageType);
message.setSendApproach(SystemMessageSendApproach.MQ);
message.setContent(contentStr);
systemMessageDao.insert(message);
SystemMessagePenddingEntity pendding = new SystemMessagePenddingEntity();
pendding.setMsgType(messageType);
pendding.setSendApproach(SystemMessageSendApproach.MQ);
pendding.setContent(contentStr);
pendding.setMsgId(message.getId());
systemMessagePenddingDao.insert(pendding);
logger.info("【发送MQ系统消息】-落盘完成");
return pendding.getId();
}
private void sendMQMessage(String messageType, String contentStr, String penddingId) {
String topic = getTopicByMsgType(messageType); String topic = getTopicByMsgType(messageType);
// 2.缓存下来,供滞留消息扫描。TTL -1,永不过期 // 缓存下来,供滞留消息扫描。TTL -1,永不过期
String pendingMsgLabel = null; String pendingMsgLabel = null;
String pendingMsgKey = null; String pendingMsgKey = null;
try { try {
MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr); MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr);
pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); pendingMsgLabel = UUID.randomUUID().toString().replace("-", "");
pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel); pendingMsgKey = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.set(pendingMsgKey, mcb, -1); redisUtils.set(pendingMsgKey, mcb, -1);
//logger.info("【发送MQ系统消息】-存入redis堆积列表成功-{}-{}-{}", topic, messageType, pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); logger.error("【发送MQ系统消息】将系统MQ消息存储到Redis堆积列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e));
} }
// 3.发送mq消息 // 3.发送mq消息
try { try {
Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); meMessage.putUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL, pendingMsgLabel);
rocketMQTemplate.getProducer().send(meMessage); rocketMQTemplate.getProducer().send(meMessage);
logger.info("【发送MQ系统消息】-发送到MQ成功");
} catch (Exception e) { } catch (Exception e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); logger.error("【发送MQ系统消息】发送系统MQ消息失败,堆栈信息:{}", errorStackTrace);
// 清理消息缓存 // 清理阻塞中的消息缓存
redisUtils.delete(pendingMsgKey); redisUtils.delete(pendingMsgKey);
throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode()); throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode());
} }
// 删除消息堆积
systemMessagePenddingDao.physicalDeleteById(penddingId);
} }
/** /**
@ -146,8 +188,8 @@ public class SystemMessageServiceImpl implements SystemMessageService {
} }
@Override @Override
public void penddingMqMsgScan() { public void blockedMqMsgScan() {
String scanKey = RedisKeys.pendingMqMsgKey("*"); String scanKey = RedisKeys.blockedMqMsgKey("*");
Set<String> keys = redisUtils.keys(scanKey); Set<String> keys = redisUtils.keys(scanKey);
//System.out.println(keys); //System.out.println(keys);
for (String key : keys) { for (String key : keys) {
@ -173,7 +215,7 @@ public class SystemMessageServiceImpl implements SystemMessageService {
&& l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) && l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) { ) {
logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now; l1LastAlertTime = now;
} }
@ -185,7 +227,7 @@ public class SystemMessageServiceImpl implements SystemMessageService {
createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now) createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now)
&& l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) && l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) { ) {
logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now; l1LastAlertTime = now;
} }
break; break;
@ -193,6 +235,25 @@ public class SystemMessageServiceImpl implements SystemMessageService {
} }
} }
@Override
public void penddingMqMsgScan() {
Integer count = systemMessagePenddingDao.selectCount(null);
if (count == 0) {
return;
}
// 扫描并且重新投递
List<SystemMessagePenddingEntity> penddingMsgs = systemMessagePenddingDao.selectList(null);
for (SystemMessagePenddingEntity penddingMsg : penddingMsgs) {
try {
sendMQMessage(penddingMsg.getMsgType(), penddingMsg.getContent(), penddingMsg.getId());
} catch (Exception e) {
// 投递失败不应影响后续消息的投递
logger.error("【重新投递MQ消息】失败, msgType:{}, penddingMsgId:{}, 错误:{}", penddingMsg.getMsgType(), penddingMsg.getId() , ExceptionUtils.getErrorStackTrace(e));
}
}
}
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor

26
epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.epmet.dao.SystemMessagePenddingDao">
<resultMap type="com.epmet.entity.SystemMessagePenddingEntity" id="systemMessagePenddingMap">
<result property="id" column="ID"/>
<result property="msgId" column="MSG_ID"/>
<result property="msgType" column="MSG_TYPE"/>
<result property="sendApproach" column="SEND_APPROACH"/>
<result property="content" column="CONTENT"/>
<result property="revision" column="REVISION"/>
<result property="createdBy" column="CREATED_BY"/>
<result property="createdTime" column="CREATED_TIME"/>
<result property="updatedBy" column="UPDATED_BY"/>
<result property="updatedTime" column="UPDATED_TIME"/>
<result property="delFlag" column="DEL_FLAG"/>
</resultMap>
<!--物理删除消息堆积-->
<delete id="physicalDeleteById">
delete from system_message_pendding where ID = #{penddingId}
</delete>
</mapper>

8
epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java

@ -57,7 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
public void consumeMessage(MessageExt messageExt) { public void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg); logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -84,7 +84,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【议题/项目分类标签初始化事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -98,8 +98,8 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); //logger.info("【议题/项目分类标签初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
} }
} }

8
epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java

@ -56,7 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) { private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg); logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -82,7 +82,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【客户初始化事件监听器】-orgRole-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -129,9 +129,9 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【客户初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); //logger.info("【客户初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
} }
/* @Override /* @Override

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java

@ -55,7 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
GridBaseInfoFormDTO obj = JSON.parseObject(msg, GridBaseInfoFormDTO.class); GridBaseInfoFormDTO obj = JSON.parseObject(msg, GridBaseInfoFormDTO.class);
@ -87,7 +87,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -101,8 +101,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,pendingMsgLabel:{}", pendingMsgLabel); //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
} }
} }

7
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java

@ -58,7 +58,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags); logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags);
StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class); StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class);
@ -93,7 +93,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【开放数据事件监听器】-巡查-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -107,7 +107,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
//logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
} }
} }

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

@ -58,11 +58,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
//messageExt.propert //messageExt.propert
logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel);
StaffBaseInfoFormDTO obj = JSON.parseObject(msg, StaffBaseInfoFormDTO.class); StaffBaseInfoFormDTO obj = JSON.parseObject(msg, StaffBaseInfoFormDTO.class);
DistributedLock distributedLock = null; DistributedLock distributedLock = null;
@ -87,7 +87,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【开放数据事件监听器】-staff-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -102,7 +102,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
} }
} }

8
epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java

@ -51,7 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
private void consumeMessage(MessageExt messageExt) { private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -82,7 +82,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
try { try {
removePendingMqMsgCache(pendingMsgLabel); removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) { } catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
} }
} }
} }
@ -96,9 +96,9 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */
private void removePendingMqMsgCache(String pendingMsgLabel) { private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key); redisUtils.delete(key);
logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); //logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
} }
/* @Override /* @Override

Loading…
Cancel
Save