From bd83017dc821005b33fe640329a164ff5eb7ec6a Mon Sep 17 00:00:00 2001 From: wxz Date: Thu, 17 Jun 2021 12:20:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9mq=20listener=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E6=96=B9=E5=BC=8F=EF=BC=8C=E5=BC=83=E7=94=A8=E6=B3=A8?= =?UTF-8?q?=E8=A7=A3=E6=B3=A8=E5=86=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../epmet/mq/RocketMQConsumerRegister.java | 62 +++++++++++++++++++ .../IssueProjectCategoryTagInitListener.java | 39 ++++++------ 2 files changed, 83 insertions(+), 18 deletions(-) create mode 100644 epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..1674e82553 --- /dev/null +++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -0,0 +1,62 @@ +package com.epmet.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.mq.listener.IssueProjectCategoryTagInitListener; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Component +public class RocketMQConsumerRegister { + + @Value("${rocketmq.name-server}") + private String nameServer; + + /** + * @return + * @Description 注册监听器 + * @author wxz + * @date 2021.03.03 16:09 + */ + @PostConstruct + public void registerAllListeners() { + try { + register(nameServer, ConsomerGroupConstants.ISSUE_PROJECT_CATEGORY_TAG, MessageModel.CLUSTERING, TopicConstants.INIT_CUSTOMER, "*", new IssueProjectCategoryTagInitListener()); + } catch (MQClientException e) { + e.printStackTrace(); + } + } + + public void register(String nameServer, String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException { + // 实例化消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + + // 设置NameServer的地址 + consumer.setNamesrvAddr(nameServer); + consumer.setMessageModel(messageModel); + consumer.setInstanceName(buildInstanceName()); + // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 + consumer.subscribe(topic, subException); + // 注册回调实现类来处理从broker拉取回来的消息 + consumer.registerMessageListener(listener); + // 启动消费者实例 + consumer.start(); + } + + private String buildInstanceName() { + String instanceName = ""; + for (int i = 0; i < 4; i++) { + int t = (int) (Math.random() * 10); + instanceName = instanceName.concat(t + ""); + } + + return instanceName; + } + +} diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java index beb1fa1473..19ca52066c 100644 --- a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java +++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java @@ -1,35 +1,28 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; -import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; -import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.dto.form.CategoryTagInitFormDTO; import com.epmet.service.IssueProjectCategoryDictService; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.spring.annotation.MessageModel; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import java.util.List; import java.util.concurrent.TimeUnit; /** * 监听初始化客户动作,为客户初始化议题、项目的分类、标签数据 */ -@RocketMQMessageListener(topic = TopicConstants.INIT_CUSTOMER, - consumerGroup = ConsomerGroupConstants.ISSUE_PROJECT_CATEGORY_TAG, - messageModel = MessageModel.CLUSTERING, - selectorExpression = "*") -@Component -public class IssueProjectCategoryTagInitListener implements RocketMQListener { +public class IssueProjectCategoryTagInitListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -40,9 +33,19 @@ public class IssueProjectCategoryTagInitListener implements RocketMQListener msgs, ConsumeConcurrentlyContext context) { + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + public void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); + logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); CategoryTagInitFormDTO dto = new CategoryTagInitFormDTO(); @@ -50,16 +53,16 @@ public class IssueProjectCategoryTagInitListener implements RocketMQListener