From 3ef982b5d4a42f362b670a2d924e960b3a5f926f Mon Sep 17 00:00:00 2001 From: wxz Date: Wed, 14 Jul 2021 14:46:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=EF=BC=9A=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E5=8F=98=E5=8A=A8rocketmq=E6=B6=88=E8=B4=B9=E8=80=85=EF=BC=8C?= =?UTF-8?q?=E9=99=90=E5=88=B6=E6=B6=88=E8=B4=B9=E7=8E=87=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=BE=AE=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/epmet/mq/ProjectChangedCustomListener.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index 6991c2e9be..852a5aec52 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -78,10 +78,16 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId()); if (redisUtils.get(redisKey) == null) { - consumeMessage(msgObj); - log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); - // 有效期30秒 + // 该线程启动消费之后,其他线程再收到该客户id的消费请求,则不消费 redisUtils.set(redisKey, msg, 30); + try { + consumeMessage(msgObj); + } catch (Exception e) { + // 如果消费失败了,清楚该key,收到消息之后可以继续消费 + redisUtils.delete(redisKey); + throw e; + } + log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); } else { log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); }