From 80f82908c5dcc352eaa48a47269bf145aec62847 Mon Sep 17 00:00:00 2001 From: wxz Date: Fri, 9 Jul 2021 17:38:12 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E5=8F=98=E5=8A=A8=E6=B6=88=E8=B4=B9=E9=A2=91=E7=8E=87=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E7=94=A8redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/ProjectChangedCustomListener.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index a9f266edf4..d1b789dc7e 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -5,6 +5,8 @@ import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; @@ -21,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.PreDestroy; import java.util.Date; @@ -41,24 +44,16 @@ import java.util.stream.Collectors; public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); - /** - * 控制通知类型消息的消费频率 - */ - private static final Cache customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED) - .expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build(); + + private RedisUtils redisUtils; @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { long start = System.currentTimeMillis(); try { - List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - for (String customerId : customerIds) { - //获取缓存 如果不存在缓存中 则执行消费 并放入缓存中 - String ifPresent = customerIdCache.getIfPresent(customerId); - if (StringUtils.isBlank(ifPresent)){ - consumeMessage(customerId); - customerIdCache.put(customerId,customerId); - } + List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + for (String msgStr : msgStrs) { + consumeMessage(msgStr); } } catch (Exception e) { //失败不重发 @@ -76,6 +71,23 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently log.warn("consumeMessage msg body is blank"); return; } + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId()); + if (redisUtils.get(redisKey) == null) { + consumeMessage(msgObj); + log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); + // 有效期30秒 + redisUtils.set(redisKey, msg, 30); + } else { + log.info("该项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); + } + } + + public void consumeMessage(ProjectChangedMQMsg msgObj) { DistributedLock distributedLock = null; RLock lock = null; try { @@ -128,6 +140,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } } } + @PreDestroy public void saveCalStatus() { //todo From fa862c51ec93e96428362508510570a6eaf68a52 Mon Sep 17 00:00:00 2001 From: wxz Date: Fri, 9 Jul 2021 17:42:37 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/epmet/commons/tools/redis/RedisKeys.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 68c417c580..6f5e7bf1f3 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -469,4 +469,8 @@ public class RedisKeys { public static String getCorsConfigKey() { return rootPrefix.concat("sys:cors"); } + + public static String getProjectChangedMsgDistinceKey(String customerId) { + return rootPrefix.concat("project_changed:consume:").concat(customerId); + } } From bb32e62993e1a9dfd12d1d1e45b5de4d2eab3f31 Mon Sep 17 00:00:00 2001 From: wxz Date: Fri, 9 Jul 2021 17:55:58 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=96=87=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/epmet/mq/ProjectChangedCustomListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index d1b789dc7e..6991c2e9be 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -83,7 +83,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently // 有效期30秒 redisUtils.set(redisKey, msg, 30); } else { - log.info("该项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); + log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); } }