From 2212986a131232dd01d4ec803286cb91a3cce1ca Mon Sep 17 00:00:00 2001 From: wxz Date: Wed, 3 Feb 2021 16:40:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A5=E4=B8=8B=E6=93=8D=E4=BD=9C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81=201.mq=E7=9B=91?= =?UTF-8?q?=E5=90=AC=E5=99=A8=E5=88=9D=E5=A7=8B=E5=8C=96=E5=AE=A2=E6=88=B7?= =?UTF-8?q?Home=202.mq=E7=9B=91=E5=90=AC=E5=99=A8=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E5=AE=A2=E6=88=B7org=203.mq=E7=9B=91=E5=90=AC?= =?UTF-8?q?=E5=99=A8=E5=88=9D=E5=A7=8B=E5=8C=96=E5=AE=A2=E6=88=B7=E8=A7=92?= =?UTF-8?q?=E8=89=B2=E5=88=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../distributedlock/DistributedLock.java | 28 +++++++++---------- .../mq/listener/InitCustomerOrgListener.java | 13 +++++++++ .../InitCustomerCustomizeListener.java | 15 +++++++++- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/distributedlock/DistributedLock.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/distributedlock/DistributedLock.java index 6bfb25fe48..85a2e61c84 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/distributedlock/DistributedLock.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/distributedlock/DistributedLock.java @@ -37,21 +37,21 @@ public class DistributedLock { } /** - * @Description - * @Param name 锁名 - * @Param leaseTime 持锁时间 单位:min - * @Param waitTime 获取锁最长等待时间 单位:min - * @author zxc - * @date 2020/10/29 9:04 上午 + * 取锁 + * @param name 锁key + * @param leaseTime 持有锁的时长 + * @param waitTime 取锁等待时长 + * @param timeUnit 持锁时长的时间单位 + * @return */ - public RLock getLock(String name,Long leaseTime,Long waitTime){ + public RLock getLock(String name, Long leaseTime, Long waitTime, TimeUnit timeUnit) { RLock lock = null; - if (StringUtils.isNotBlank(name) && leaseTime > 0 && waitTime > 0 ){ + if (StringUtils.isNotBlank(name) && leaseTime > 0 && waitTime > 0) { lock = redissonClient.getLock(name); Boolean lockStatus; try { - lockStatus = lock.tryLock(waitTime,leaseTime,TimeUnit.MINUTES); - if (!lockStatus){ + lockStatus = lock.tryLock(waitTime, leaseTime, timeUnit); + if (!lockStatus) { throw new RenException("获取锁🔒失败了......"); } } catch (InterruptedException e) { @@ -62,14 +62,14 @@ public class DistributedLock { } /** - * @Description 释放锁🔒 + * @Description 释放锁🔒 * @Param rLock * @author zxc * @date 2020/10/28 2:52 下午 */ - public void unLock(RLock rLock){ - if (null != rLock){ - if (rLock.isHeldByCurrentThread()){ + public void unLock(RLock rLock) { + if (null != rLock) { + if (rLock.isHeldByCurrentThread()) { rLock.unlock(); } } diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgListener.java index 1308d09a36..a327e23579 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgListener.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgListener.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; +import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.constant.UserWorkType; @@ -15,11 +16,14 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; +import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * 监听初始化客户动作,为客户初始化角色列表 */ @@ -35,12 +39,19 @@ public class InitCustomerOrgListener implements RocketMQListener { @Autowired private AgencyService agencyService; + @Autowired + private DistributedLock distributedLock; + @Override public void onMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); + + RLock lock = null; try { + lock = distributedLock.getLock(String.format("lock:init_customer_org:%s", msgObj.getCustomerId()), + 30l, 30l, TimeUnit.SECONDS); agencyService.saveRootAgency(constructRootAndAgencyDTO(msgObj)); } catch (RenException e) { // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 @@ -49,6 +60,8 @@ public class InitCustomerOrgListener implements RocketMQListener { // 不是我们自己抛出的异常,可以让MQ重试 logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); throw e; + } finally { + distributedLock.unLock(lock); } } diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java index df2fea0902..381091738c 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerCustomizeListener.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; +import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.dto.CustomerHomeDTO; @@ -12,11 +13,14 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; +import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * 监听初始化客户动作,为客户初始化角色列表 */ @@ -32,6 +36,9 @@ public class InitCustomerCustomizeListener implements RocketMQListener