Browse Source

修改:项目变动rocketmq消费者,限制消费率逻辑微调

dev_shibei_match
wxz 4 years ago
parent
commit
3ef982b5d4
  1. 12
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

12
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -78,10 +78,16 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId());
if (redisUtils.get(redisKey) == null) {
consumeMessage(msgObj);
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId());
// 有效期30秒
// 该线程启动消费之后,其他线程再收到该客户id的消费请求,则不消费
redisUtils.set(redisKey, msg, 30);
try {
consumeMessage(msgObj);
} catch (Exception e) {
// 如果消费失败了,清楚该key,收到消息之后可以继续消费
redisUtils.delete(redisKey);
throw e;
}
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId());
} else {
log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId());
}

Loading…
Cancel
Save