Browse Source

Merge branch 'wxz_project_changed_stats_consum' into dev

dev_shibei_match
wxz 4 years ago
parent
commit
f0f774c2ba
  1. 4
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 39
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

4
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -469,4 +469,8 @@ public class RedisKeys {
public static String getCorsConfigKey() {
return rootPrefix.concat("sys:cors");
}
public static String getProjectChangedMsgDistinceKey(String customerId) {
return rootPrefix.concat("project_changed:consume:").concat(customerId);
}
}

39
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -5,6 +5,8 @@ import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.extract.form.ExtractOriginFormDTO;
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService;
@ -21,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.PreDestroy;
import java.util.Date;
@ -41,24 +44,16 @@ import java.util.stream.Collectors;
public class ProjectChangedCustomListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 控制通知类型消息的消费频率
*/
private static final Cache<String, String> customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED)
.expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build();
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long start = System.currentTimeMillis();
try {
List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
for (String customerId : customerIds) {
//获取缓存 如果不存在缓存中 则执行消费 并放入缓存中
String ifPresent = customerIdCache.getIfPresent(customerId);
if (StringUtils.isBlank(ifPresent)){
consumeMessage(customerId);
customerIdCache.put(customerId,customerId);
}
List<String> msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
for (String msgStr : msgStrs) {
consumeMessage(msgStr);
}
} catch (Exception e) {
//失败不重发
@ -76,6 +71,23 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
log.warn("consumeMessage msg body is blank");
return;
}
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId());
if (redisUtils.get(redisKey) == null) {
consumeMessage(msgObj);
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId());
// 有效期30秒
redisUtils.set(redisKey, msg, 30);
} else {
log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId());
}
}
public void consumeMessage(ProjectChangedMQMsg msgObj) {
DistributedLock distributedLock = null;
RLock lock = null;
try {
@ -128,6 +140,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
}
}
}
@PreDestroy
public void saveCalStatus() {
//todo

Loading…
Cancel
Save