Browse Source

控制项目变动的消费频率 最小间隔 为30s

dev
jianjun 4 years ago
parent
commit
70183f041a
  1. 17
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

17
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -2,6 +2,7 @@ package com.epmet.mq;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.commons.tools.utils.SpringContextUtils;
@ -9,6 +10,8 @@ import com.epmet.dto.extract.form.ExtractOriginFormDTO;
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService;
import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService;
import com.epmet.util.DimIdGenerator; import com.epmet.util.DimIdGenerator;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@ -38,13 +41,25 @@ import java.util.stream.Collectors;
public class ProjectChangedCustomListener implements MessageListenerConcurrently { public class ProjectChangedCustomListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass()); private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 控制通知类型消息的消费频率
*/
private static final Cache<String, String> customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED)
.expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build();
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
try { try {
List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
customerIds.forEach(this::consumeMessage); for (String customerId : customerIds) {
//获取缓存 如果不存在缓存中 则执行消费 并放入缓存中
String ifPresent = customerIdCache.getIfPresent(customerId);
if (StringUtils.isBlank(ifPresent)){
consumeMessage(customerId);
customerIdCache.put(customerId,customerId);
}
}
} catch (Exception e) { } catch (Exception e) {
//失败不重发 //失败不重发
logger.error("consumeMessage fail,msg:{}",e.getMessage()); logger.error("consumeMessage fail,msg:{}",e.getMessage());

Loading…
Cancel
Save