From e977206c5e6bbd23dbbf53305159514ec87efbc8 Mon Sep 17 00:00:00 2001 From: jianjun Date: Fri, 15 Oct 2021 08:58:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B7=A1=E6=9F=A5=E8=AE=B0=E5=BD=95=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=B6=88=E6=81=AF=20=E5=88=B0mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/ConsomerGroupConstants.java | 5 ++ .../rocketmq/constants/TopicConstants.java | 2 +- .../opendata/mq/RocketMQConsumerRegister.java | 10 ++- .../OpenDataPatrolChangeEventListener.java | 61 +++++++++++++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index b4dba548b7..a1c61abf6b 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -55,4 +55,9 @@ public interface ConsomerGroupConstants { */ String OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP = "open_data_staff_change_event_listener_group"; + /** + * 开放的对接数据(中间库) 巡查记录变更事件监听器分组 + */ + String OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP = "open_data_patrol_change_event_listener_group"; + } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index d28b19396d..4dfeaa3500 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -40,7 +40,7 @@ public interface TopicConstants { String STAFF = "staff"; /** - * 巡查 + * 巡查记录 */ String PATROL = "patrol"; } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java index 775e166fe0..0a966910bd 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -4,11 +4,10 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQConsumerProperties; -import com.epmet.feign.EpmetMessageOpenFeignClient; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; +import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -36,6 +35,13 @@ public class RocketMQConsumerRegister extends MQAbstractRegister { "*", new OpenDataStaffChangeEventListener()); + register(consumerProperties, + ConsomerGroupConstants.OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP, + MessageModel.CLUSTERING, + TopicConstants.PATROL, + "*", + new OpenDataPatrolChangeEventListener()); + // ...其他监听器类似 } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java new file mode 100644 index 0000000000..c8251c80fa --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java @@ -0,0 +1,61 @@ +package com.epmet.opendata.mq.listener; + +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @Description 系统对接中间库,巡查信息变更监听器 + * @author wxz + * @date 2021.10.13 15:21:48 +*/ +public class OpenDataPatrolChangeEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String tags = messageExt.getTags(); + + logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + //distributedLock = SpringContextUtils.getBean(DistributedLock.class); + //lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId), + // 30L, 30L, TimeUnit.SECONDS); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【开放数据事件监听器】-巡查记录信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【开放数据事件监听器】-巡查记录信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + //distributedLock.unLock(lock); + } + } +}