From 524a0c592ec0477ec551741d7104274dda501e66 Mon Sep 17 00:00:00 2001 From: jianjun Date: Fri, 30 Apr 2021 17:37:00 +0800 Subject: [PATCH] =?UTF-8?q?mq=E6=B3=A8=E5=86=8C=E4=BB=A3=E7=A0=81=E8=B0=83?= =?UTF-8?q?=E6=95=B4-=E6=BC=8F=E6=8E=89=E7=9A=84=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../epmet/mq/RocketMQConsumerRegister.java | 2 +- .../InitCustomerComponentsListener.java | 20 +++++- .../InitCustomerCustomizeListener.java | 68 ------------------- 3 files changed, 19 insertions(+), 71 deletions(-) delete mode 100644 epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java index 057cf3c501..c128db1207 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -12,7 +12,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -@Component +//@Component public class RocketMQConsumerRegister { @Value("${rocketmq.name-server}") diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java index 147d716674..36fb8c930b 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java @@ -1,7 +1,11 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; +import com.epmet.commons.rocketmq.register.ConsumerConfigProperties; +import com.epmet.commons.rocketmq.register.MQConsumerRegister; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; @@ -15,6 +19,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.TimeUnit; @@ -22,7 +27,8 @@ import java.util.concurrent.TimeUnit; /** * 监听初始化客户动作,为客户初始化角色列表 */ -public class InitCustomerComponentsListener implements MessageListenerConcurrently { +@Component +public class InitCustomerComponentsListener extends MQConsumerRegister implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -65,4 +71,14 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent distributedLock.unLock(lock); } } -} \ No newline at end of file + + @Override + public ConsumerConfigProperties getConsumerProperty() { + ConsumerConfigProperties configProperties = new ConsumerConfigProperties(); + configProperties.setConsumerGroup(ConsomerGroupConstants.INIT_CUSTOMER_COMPONENTS_GROUP); + configProperties.setTopic(TopicConstants.INIT_CUSTOMER); + configProperties.setTag("*"); + configProperties.setConsumerListener(this); + return configProperties; + } +} diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java deleted file mode 100644 index 7f7ddd5644..0000000000 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.epmet.mq.listener; - -import com.alibaba.fastjson.JSON; -import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; -import com.epmet.commons.rocketmq.constants.TopicConstants; -import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; -import com.epmet.commons.tools.distributedlock.DistributedLock; -import com.epmet.commons.tools.exception.ExceptionUtils; -import com.epmet.commons.tools.exception.RenException; -import com.epmet.dto.CustomerHomeDTO; -import com.epmet.service.CustomerHomeService; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.redisson.api.RLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.concurrent.TimeUnit; - -/** - * 监听初始化客户动作,为客户初始化角色列表 - * 已废弃 - */ -//@RocketMQMessageListener(topic = TopicConstants.INIT_CUSTOMER, -// consumerGroup = ConsomerGroupConstants.INIT_CUSTOMER_COMPONENTS_GROUP, -// messageModel = MessageModel.CLUSTERING, -// selectorExpression = "*") -//@Component -public class InitCustomerCustomizeListener implements RocketMQListener { - - private Logger logger = LoggerFactory.getLogger(getClass()); - - @Autowired - private CustomerHomeService customerHomeService; - - @Autowired - private DistributedLock distributedLock; - - @Override - public void onMessage(MessageExt messageExt) { - String msg = new String(messageExt.getBody()); - logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); - InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); - - CustomerHomeDTO customerHomeDTO = new CustomerHomeDTO(); - customerHomeDTO.setCustomerId(msgObj.getCustomerId()); - - RLock lock = null; - try { - lock = distributedLock.getLock(String.format("lock:init_customer_home:%s", msgObj.getCustomerId()), - 30l, 30l, TimeUnit.SECONDS); - customerHomeService.init(customerHomeDTO); - } catch (RenException e) { - // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 - logger.error("【RocketMQ】初始化客户组件失败:".concat(ExceptionUtils.getErrorStackTrace(e))); - } catch (Exception e) { - // 不是我们自己抛出的异常,可以让MQ重试 - logger.error("【RocketMQ】初始化客户组件失败:".concat(ExceptionUtils.getErrorStackTrace(e))); - throw e; - } finally { - distributedLock.unLock(lock); - } - } -} \ No newline at end of file