diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index ecc2d38a9b..a9f266edf4 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -2,6 +2,7 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; +import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.SpringContextUtils; @@ -9,6 +10,8 @@ import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; import com.epmet.util.DimIdGenerator; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -38,13 +41,25 @@ import java.util.stream.Collectors; public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); + /** + * 控制通知类型消息的消费频率 + */ + private static final Cache customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED) + .expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build(); @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { long start = System.currentTimeMillis(); try { List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - customerIds.forEach(this::consumeMessage); + for (String customerId : customerIds) { + //获取缓存 如果不存在缓存中 则执行消费 并放入缓存中 + String ifPresent = customerIdCache.getIfPresent(customerId); + if (StringUtils.isBlank(ifPresent)){ + consumeMessage(customerId); + customerIdCache.put(customerId,customerId); + } + } } catch (Exception e) { //失败不重发 logger.error("consumeMessage fail,msg:{}",e.getMessage());