diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 68c417c580..6f5e7bf1f3 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -469,4 +469,8 @@ public class RedisKeys { public static String getCorsConfigKey() { return rootPrefix.concat("sys:cors"); } + + public static String getProjectChangedMsgDistinceKey(String customerId) { + return rootPrefix.concat("project_changed:consume:").concat(customerId); + } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index a9f266edf4..6991c2e9be 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -5,6 +5,8 @@ import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; @@ -21,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.PreDestroy; import java.util.Date; @@ -41,24 +44,16 @@ import java.util.stream.Collectors; public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); - /** - * 控制通知类型消息的消费频率 - */ - private static final Cache customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED) - .expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build(); + + private RedisUtils redisUtils; @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { long start = System.currentTimeMillis(); try { - List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - for (String customerId : customerIds) { - //获取缓存 如果不存在缓存中 则执行消费 并放入缓存中 - String ifPresent = customerIdCache.getIfPresent(customerId); - if (StringUtils.isBlank(ifPresent)){ - consumeMessage(customerId); - customerIdCache.put(customerId,customerId); - } + List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + for (String msgStr : msgStrs) { + consumeMessage(msgStr); } } catch (Exception e) { //失败不重发 @@ -76,6 +71,23 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently log.warn("consumeMessage msg body is blank"); return; } + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId()); + if (redisUtils.get(redisKey) == null) { + consumeMessage(msgObj); + log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); + // 有效期30秒 + redisUtils.set(redisKey, msg, 30); + } else { + log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); + } + } + + public void consumeMessage(ProjectChangedMQMsg msgObj) { DistributedLock distributedLock = null; RLock lock = null; try { @@ -128,6 +140,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } } } + @PreDestroy public void saveCalStatus() { //todo