diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 6fe4130225..9d2497f24e 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -547,4 +547,16 @@ public class RedisKeys { public static String getQuestionnaireAccessKey(String userId, String qKey) { return rootPrefix.concat("questionnaire:accesskey:").concat(userId).concat(":").concat(qKey); } + + /** + * @description 检查message MQ滞留消息 + * + * @param pendingMsgLabel 滞留消息的label + * @return + * @author wxz + * @date 2021.10.14 14:33:53 + */ + public static String checkPendingMqMsgKey(String pendingMsgLabel) { + return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel); + } } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java new file mode 100644 index 0000000000..593ab0cc57 --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java @@ -0,0 +1,13 @@ +package com.epmet.constant; + +/** + * @Description MQ用户自定义属性 + * @author wxz + * @date 2021.10.14 15:47:03 +*/ +public interface MQUserPropertys { + + //堆积消息label + String PENDING_MSG_LABEL = "pendingMsgLabel"; + +} diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java index c0fc9308b8..c0fb1400e9 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java @@ -7,9 +7,18 @@ import javax.validation.constraints.NotNull; @Data public class SystemMsgFormDTO { - @NotNull(message = "消息类型不能为空") + // 发送mq消息分组 + public interface SendMsgByMQ {} + + // 应答mq消息 + public interface AckMsgByMQ {} + + @NotNull(message = "消息类型不能为空", groups = { SendMsgByMQ.class }) private String messageType; - @NotNull(message = "消息内容不能为空") + @NotNull(message = "消息内容不能为空", groups = { SendMsgByMQ.class }) private Object content; + + @NotNull(message = "pendingMsgLabel不能为空", groups = { AckMsgByMQ.class }) + private String pendingMsgLabel; } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java index f375d75aaf..454fd5fd5d 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java @@ -27,7 +27,7 @@ import java.util.List; * @author yinzuomei@elink-cn.com * @date 2020/6/4 13:47 */ -//@FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallback = EpmetMessageOpenFeignClientFallback.class,url = "http://127.0.0.1:8085") +//@FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallbackFactory = EpmetMessageOpenFeignClientFallbackFactory.class, url = "http://127.0.0.1:8085") @FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallbackFactory = EpmetMessageOpenFeignClientFallbackFactory.class) public interface EpmetMessageOpenFeignClient { /** @@ -107,4 +107,15 @@ public interface EpmetMessageOpenFeignClient { */ @PostMapping("/message/system/send-by-mq") Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); + + /** + * @description 应答mq消息 + * + * @param form + * @return + * @author wxz + * @date 2021.10.14 16:07:17 + */ + @PostMapping("/message/system/ack-mq-msg") + Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/fallback/EpmetMessageOpenFeignClientFallback.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/fallback/EpmetMessageOpenFeignClientFallback.java index 069d76c795..998b890814 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/fallback/EpmetMessageOpenFeignClientFallback.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/fallback/EpmetMessageOpenFeignClientFallback.java @@ -69,4 +69,9 @@ public class EpmetMessageOpenFeignClientFallback implements EpmetMessageOpenFeig public Result sendSystemMsgByMQ(SystemMsgFormDTO form) { return ModuleUtils.feignConError(ServiceConstant.EPMET_MESSAGE_SERVER, "sendSystemMsgByMQ", form); } + + @Override + public Result ackSystemMsgByMQ(SystemMsgFormDTO form) { + return ModuleUtils.feignConError(ServiceConstant.EPMET_MESSAGE_SERVER, "ackSystemMsgByMQ", form); + } } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java index 7a31e91785..002d9ea424 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java @@ -17,11 +17,34 @@ public class SystemMessageController { @Autowired private SystemMessageService systemMessageService; + /** + * @description 发送mq消息 + * + * @param form + * @return + * @author wxz + * @date 2021.10.14 16:07:07 + */ @PostMapping("send-by-mq") public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { - ValidatorUtils.validateEntity(form); + ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class); systemMessageService.sendMQMessage(form.getMessageType(), form.getContent()); return new Result(); } + /** + * @description 应答mq消息 + * + * @param form + * @return + * @author wxz + * @date 2021.10.14 16:07:17 + */ + @PostMapping("ack-mq-msg") + public Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { + ValidatorUtils.validateEntity(form, SystemMsgFormDTO.AckMsgByMQ.class); + systemMessageService.ackMqMessage(form.getPendingMsgLabel()); + return new Result(); + } + } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java index 2985b550c9..25189683f0 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java @@ -2,6 +2,25 @@ package com.epmet.service; public interface SystemMessageService { + /** + * @description 发送mq消息 + * + * @param messageType + * @param content + * @return + * @author wxz + * @date 2021.10.14 15:07:02 + */ void sendMQMessage(String messageType, Object content); + /** + * @description 消息应答 + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 15:07:09 + */ + void ackMqMessage(String pendingMsgLabel); + } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index f7aacf297c..2c66aff492 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -7,11 +7,17 @@ import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.constant.MQUserPropertys; import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageType; import com.epmet.dao.SystemMessageDao; import com.epmet.entity.SystemMessageEntity; import com.epmet.service.SystemMessageService; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.spring.core.RocketMQTemplate; @@ -21,6 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.Date; +import java.util.UUID; + @Service public class SystemMessageServiceImpl implements SystemMessageService { @@ -32,6 +41,9 @@ public class SystemMessageServiceImpl implements SystemMessageService { @Autowired private RocketMQTemplate rocketMQTemplate; + @Autowired + private RedisUtils redisUtils; + @Transactional(rollbackFor = Exception.class) @Override public void sendMQMessage(String messageType, Object content) { @@ -43,13 +55,24 @@ public class SystemMessageServiceImpl implements SystemMessageService { systemMessageEntity.setContent(contentStr); systemMessageDao.insert(systemMessageEntity); + // 缓存下来,供滞留消息扫描。TTL -1,永不过期 + MessageCacheBean mcb = new MessageCacheBean(new Date(), messageType, contentStr); + String pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); + String pendingMsgKey = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); + redisUtils.set(pendingMsgKey, mcb, -1); + //发送mq消息 try { Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); + meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); rocketMQTemplate.getProducer().send(meMessage); } catch (Exception e) { String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace); + + // 清理消息缓存 + redisUtils.delete(pendingMsgKey); + throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode()); } } @@ -94,4 +117,20 @@ public class SystemMessageServiceImpl implements SystemMessageService { } return topic; } + + @Override + public void ackMqMessage(String pendingMsgLabel) { + String key = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【MQ消息应答】pendingMsgLabel{}", pendingMsgLabel); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + class MessageCacheBean { + private Date createTime; + private String messageType; + private Object content; + } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/pom.xml b/epmet-module/open-data-worker/open-data-worker-server/pom.xml index 289b030dc7..f8a6389e73 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/pom.xml +++ b/epmet-module/open-data-worker/open-data-worker-server/pom.xml @@ -60,6 +60,12 @@ epmet-commons-rocketmq 2.0.0 + + + com.epmet + epmet-message-client + 2.0.0 + @@ -166,7 +172,7 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd - false + true 192.168.1.140:9876;192.168.1.141:9876 diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/OpenDataApplication.java similarity index 90% rename from epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java rename to epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/OpenDataApplication.java index bb8ff976ac..532877cf4b 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/OpenDataApplication.java @@ -1,4 +1,4 @@ -package com.epmet.opendata; +package com.epmet; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -18,7 +18,7 @@ import org.springframework.context.annotation.ComponentScan; @EnableDiscoveryClient @EnableFeignClients @ServletComponentScan -@ComponentScan(value = { "com.epmet.opendata", "com.epmet.commons" }) +//@ComponentScan(value = { "com.epmet" }) public class OpenDataApplication { public static void main(String[] args) { SpringApplication.run(OpenDataApplication.class, args); diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java index ede4eeb83a..775e166fe0 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -4,9 +4,11 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.feign.EpmetMessageOpenFeignClient; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java index 17cc9eca72..467f690259 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -3,6 +3,12 @@ package com.epmet.opendata.mq.listener; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.constant.MQUserPropertys; +import com.epmet.dto.form.SystemMsgFormDTO; +import com.epmet.feign.EpmetMessageOpenFeignClient; +import org.apache.commons.lang.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -22,8 +28,19 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent private Logger logger = LoggerFactory.getLogger(getClass()); + private EpmetMessageOpenFeignClient messageOpenFeignClient; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (messageOpenFeignClient == null) { + try { + messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -38,6 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); @@ -57,5 +75,31 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent } finally { //distributedLock.unLock(lock); } + + // 应答mq消息(调用message服务的应答api) + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + ackMqMsg(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description 应答mq消息 + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void ackMqMsg(String pendingMsgLabel) { + SystemMsgFormDTO form = new SystemMsgFormDTO(); + form.setPendingMsgLabel(pendingMsgLabel); + Result result = messageOpenFeignClient.ackSystemMsgByMQ(form); + if (!result.success()) { + logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg()); + } } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java index b8f0be40bd..320b78f7ae 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -3,6 +3,12 @@ package com.epmet.opendata.mq.listener; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.constant.MQUserPropertys; +import com.epmet.dto.form.SystemMsgFormDTO; +import com.epmet.feign.EpmetMessageOpenFeignClient; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -11,6 +17,7 @@ import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.PostConstruct; import java.util.List; /** @@ -22,8 +29,15 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre private Logger logger = LoggerFactory.getLogger(getClass()); + private EpmetMessageOpenFeignClient messageOpenFeignClient; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (messageOpenFeignClient == null) { + messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -38,8 +52,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + + //messageExt.propert - logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}", msg, tags); + logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); DistributedLock distributedLock = null; RLock lock = null; @@ -47,6 +64,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre //distributedLock = SpringContextUtils.getBean(DistributedLock.class); //lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId), // 30L, 30L, TimeUnit.SECONDS); + } catch (RenException e) { // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); @@ -57,5 +75,31 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre } finally { //distributedLock.unLock(lock); } + + // 应答mq消息(调用message服务的应答api) + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + ackMqMsg(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description 应答mq消息 + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void ackMqMsg(String pendingMsgLabel) { + SystemMsgFormDTO form = new SystemMsgFormDTO(); + form.setPendingMsgLabel(pendingMsgLabel); + Result result = messageOpenFeignClient.ackSystemMsgByMQ(form); + if (!result.success()) { + logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg()); + } } }