|  | @ -2,6 +2,7 @@ package com.epmet.mq; | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  | import com.alibaba.fastjson.JSON; |  |  | import com.alibaba.fastjson.JSON; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; |  |  | import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | import com.epmet.commons.tools.constant.NumConstant; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.distributedlock.DistributedLock; |  |  | import com.epmet.commons.tools.distributedlock.DistributedLock; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.exception.RenException; |  |  | import com.epmet.commons.tools.exception.RenException; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.commons.tools.utils.SpringContextUtils; |  |  | import com.epmet.commons.tools.utils.SpringContextUtils; | 
			
		
	
	
		
		
			
				
					|  | @ -9,6 +10,8 @@ import com.epmet.dto.extract.form.ExtractOriginFormDTO; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; |  |  | import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; |  |  | import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; | 
			
		
	
		
		
			
				
					|  |  | import com.epmet.util.DimIdGenerator; |  |  | import com.epmet.util.DimIdGenerator; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | import com.google.common.cache.Cache; | 
			
		
	
		
		
			
				
					|  |  |  |  |  | import com.google.common.cache.CacheBuilder; | 
			
		
	
		
		
			
				
					|  |  | import lombok.extern.slf4j.Slf4j; |  |  | import lombok.extern.slf4j.Slf4j; | 
			
		
	
		
		
			
				
					|  |  | import org.apache.commons.lang3.StringUtils; |  |  | import org.apache.commons.lang3.StringUtils; | 
			
		
	
		
		
			
				
					|  |  | import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |  |  | import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; | 
			
		
	
	
		
		
			
				
					|  | @ -38,13 +41,25 @@ import java.util.stream.Collectors; | 
			
		
	
		
		
			
				
					|  |  | public class ProjectChangedCustomListener implements MessageListenerConcurrently { |  |  | public class ProjectChangedCustomListener implements MessageListenerConcurrently { | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |     private Logger logger = LoggerFactory.getLogger(getClass()); |  |  |     private Logger logger = LoggerFactory.getLogger(getClass()); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     /** | 
			
		
	
		
		
			
				
					|  |  |  |  |  |      * 控制通知类型消息的消费频率 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |      */ | 
			
		
	
		
		
			
				
					|  |  |  |  |  |     private static final Cache<String, String> customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED) | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             .expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build(); | 
			
		
	
		
		
			
				
					|  |  | 
 |  |  | 
 | 
			
		
	
		
		
			
				
					|  |  |     @Override |  |  |     @Override | 
			
		
	
		
		
			
				
					|  |  |     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { |  |  |     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { | 
			
		
	
		
		
			
				
					|  |  |         long start = System.currentTimeMillis(); |  |  |         long start = System.currentTimeMillis(); | 
			
		
	
		
		
			
				
					|  |  |         try { |  |  |         try { | 
			
		
	
		
		
			
				
					|  |  |             List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); |  |  |             List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); | 
			
		
	
		
		
			
				
					
					|  |  |             customerIds.forEach(this::consumeMessage); |  |  |             for (String customerId : customerIds) { | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					|  |  |  |  |  |                 //获取缓存 如果不存在缓存中 则执行消费 并放入缓存中
 | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 String ifPresent = customerIdCache.getIfPresent(customerId); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 if (StringUtils.isBlank(ifPresent)){ | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     consumeMessage(customerId); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                     customerIdCache.put(customerId,customerId); | 
			
		
	
		
		
			
				
					|  |  |  |  |  |                 } | 
			
		
	
		
		
			
				
					|  |  |  |  |  |             } | 
			
		
	
		
		
			
				
					|  |  |         } catch (Exception e) { |  |  |         } catch (Exception e) { | 
			
		
	
		
		
			
				
					|  |  |             //失败不重发
 |  |  |             //失败不重发
 | 
			
		
	
		
		
			
				
					|  |  |             logger.error("consumeMessage fail,msg:{}",e.getMessage()); |  |  |             logger.error("consumeMessage fail,msg:{}",e.getMessage()); | 
			
		
	
	
		
		
			
				
					|  | 
 |