diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index 6991c2e9be..852a5aec52 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -78,10 +78,16 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId()); if (redisUtils.get(redisKey) == null) { - consumeMessage(msgObj); - log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); - // 有效期30秒 + // 该线程启动消费之后,其他线程再收到该客户id的消费请求,则不消费 redisUtils.set(redisKey, msg, 30); + try { + consumeMessage(msgObj); + } catch (Exception e) { + // 如果消费失败了,清楚该key,收到消息之后可以继续消费 + redisUtils.delete(redisKey); + throw e; + } + log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); } else { log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); }