From f13829108654a2f20957b5ae68831df3df51796f Mon Sep 17 00:00:00 2001 From: wangxianzhang Date: Mon, 29 Nov 2021 12:49:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=20=E8=B0=83?= =?UTF-8?q?=E6=95=B4mq=E7=9B=91=E5=90=AC=E5=99=A8=E5=88=86=E5=B8=83?= =?UTF-8?q?=E5=BC=8F=E9=94=81=E6=B3=A8=E5=85=A5=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../opendata/mq/RocketMQConsumerRegister.java | 14 ++++++++++---- .../listener/OpenDataOrgChangeEventListener.java | 9 +++++++-- .../OpenDataPatrolChangeEventListener.java | 8 ++++++-- .../OpenDataProjectChangeEventListener.java | 8 ++++++-- .../listener/OpenDataStaffChangeEventListener.java | 8 ++++++-- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java index 3fa339bc0b..dc02f0498a 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -4,11 +4,13 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataProjectChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -19,35 +21,39 @@ import org.springframework.stereotype.Component; @Component public class RocketMQConsumerRegister extends MQAbstractRegister { + @Autowired + private DistributedLock distributedLock; + @Override public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { + // 客户初始化监听器注册 register(consumerProperties, ConsomerGroupConstants.OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP, MessageModel.CLUSTERING, TopicConstants.ORG, "*", - new OpenDataOrgChangeEventListener()); + new OpenDataOrgChangeEventListener(distributedLock)); register(consumerProperties, ConsomerGroupConstants.OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP, MessageModel.CLUSTERING, TopicConstants.STAFF, "*", - new OpenDataStaffChangeEventListener()); + new OpenDataStaffChangeEventListener(distributedLock)); register(consumerProperties, ConsomerGroupConstants.OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP, MessageModel.CLUSTERING, TopicConstants.PATROL, "*", - new OpenDataPatrolChangeEventListener()); + new OpenDataPatrolChangeEventListener(distributedLock)); register(consumerProperties, ConsomerGroupConstants.OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT, "*", - new OpenDataProjectChangeEventListener()); + new OpenDataProjectChangeEventListener(distributedLock)); // ...其他监听器类似 } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java index 04e95a893b..8f123f23c5 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -35,6 +35,13 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent private RedisUtils redisUtils; + private DistributedLock distributedLock; + + public OpenDataOrgChangeEventListener(DistributedLock distributedLock) { + this.distributedLock = distributedLock; + } + + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -62,10 +69,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); OrgOrStaffMQMsg obj = JSON.parseObject(msg, OrgOrStaffMQMsg.class); - DistributedLock distributedLock = null; RLock lock = null; try { - distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:open_data_org:%s", obj.getOrgId()), 30L, 30L, TimeUnit.SECONDS); diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java index 6207c6fa74..45acb46283 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java @@ -37,6 +37,12 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr private RedisUtils redisUtils; + private DistributedLock distributedLock; + + public OpenDataPatrolChangeEventListener(DistributedLock distributedLock) { + this.distributedLock = distributedLock; + } + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -67,10 +73,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr log.warn("consumeMessage msg body is blank"); return; } - DistributedLock distributedLock = null; RLock lock = null; try { - distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:open_data_patrol:%s", msgObj.getPatrolId()), 30L, 30L, TimeUnit.SECONDS); UpsertPatrolRecordForm patrolRecordForm = new UpsertPatrolRecordForm(); diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java index c2af1cffe7..6c5f127cb9 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java @@ -38,6 +38,12 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur private RedisUtils redisUtils; + private DistributedLock distributedLock; + + public OpenDataProjectChangeEventListener(DistributedLock distributedLock) { + this.distributedLock = distributedLock; + } + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -68,10 +74,8 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur DisputeProcessMQMsg obj = JSON.parseObject(msg, DisputeProcessMQMsg.class); EventInfoFormDTO formDTO = ConvertUtils.sourceToTarget(obj, EventInfoFormDTO.class); - DistributedLock distributedLock = null; RLock lock = null; try { - distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()), 30L, 30L, TimeUnit.SECONDS); SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(formDTO); diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java index 48d0ece671..658a0982ac 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -38,6 +38,12 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre private RedisUtils redisUtils; + private DistributedLock distributedLock; + + public OpenDataStaffChangeEventListener(DistributedLock distributedLock) { + this.distributedLock = distributedLock; + } + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -67,10 +73,8 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel); OrgOrStaffMQMsg obj = JSON.parseObject(msg, OrgOrStaffMQMsg.class); - DistributedLock distributedLock = null; RLock lock = null; try { - distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", obj.getOrgId()), 30L, 30L, TimeUnit.SECONDS);