diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 68c417c580..6f5e7bf1f3 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -469,4 +469,8 @@ public class RedisKeys { public static String getCorsConfigKey() { return rootPrefix.concat("sys:cors"); } + + public static String getProjectChangedMsgDistinceKey(String customerId) { + return rootPrefix.concat("project_changed:consume:").concat(customerId); + } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java index 1cdb4a1e8d..5410417202 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java @@ -46,4 +46,22 @@ public interface ScreenProjectDataDao extends BaseDao { void insertBatch(@Param("list") List list); void updateBatch(@Param("list") List list,@Param("dateId") String dateId); + + /** + * 根据项目ID删除数据 + * @author zhaoqifeng + * @date 2021/7/9 17:33 + * @param projectId + * @return int + */ + int deleteByProjectId(@Param("projectId") String projectId); + + /** + * 根据项目ID删除数据 + * @author zhaoqifeng + * @date 2021/7/9 17:33 + * @param list + * @return int + */ + void deleteByProjectIds(@Param("list") List list); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java index 4904f32724..9335e8aa61 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java @@ -37,4 +37,6 @@ public interface ScreenProjectImgDataDao extends BaseDao list); void insertBatch(@Param("list") List list); + + int deleteByProjectId(@Param("projectId") String projectId); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index a9f266edf4..6991c2e9be 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -5,6 +5,8 @@ import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; @@ -21,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.PreDestroy; import java.util.Date; @@ -41,24 +44,16 @@ import java.util.stream.Collectors; public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); - /** - * 控制通知类型消息的消费频率 - */ - private static final Cache customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED) - .expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build(); + + private RedisUtils redisUtils; @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { long start = System.currentTimeMillis(); try { - List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - for (String customerId : customerIds) { - //获取缓存 如果不存在缓存中 则执行消费 并放入缓存中 - String ifPresent = customerIdCache.getIfPresent(customerId); - if (StringUtils.isBlank(ifPresent)){ - consumeMessage(customerId); - customerIdCache.put(customerId,customerId); - } + List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + for (String msgStr : msgStrs) { + consumeMessage(msgStr); } } catch (Exception e) { //失败不重发 @@ -76,6 +71,23 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently log.warn("consumeMessage msg body is blank"); return; } + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId()); + if (redisUtils.get(redisKey) == null) { + consumeMessage(msgObj); + log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); + // 有效期30秒 + redisUtils.set(redisKey, msg, 30); + } else { + log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); + } + } + + public void consumeMessage(ProjectChangedMQMsg msgObj) { DistributedLock distributedLock = null; RLock lock = null; try { @@ -128,6 +140,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } } } + @PreDestroy public void saveCalStatus() { //todo diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java index 40888aced8..8ac534876c 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java @@ -173,14 +173,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl deleteWrapper = new QueryWrapper<>(); - deleteWrapper.eq(StringUtils.isNotBlank(item.getProjectId()), "project_id", item.getProjectId()) - .eq(StringUtils.isNotBlank(param.getCustomerId()), "customer_id", param.getCustomerId()); - baseDao.delete(deleteWrapper); - QueryWrapper screenProjectImgDataEntityQueryWrapper = new QueryWrapper<>(); - screenProjectImgDataEntityQueryWrapper.eq(StringUtils.isNotBlank(item.getProjectId()), "project_id", item.getProjectId()) - .eq(StringUtils.isNotBlank(param.getCustomerId()), "customer_id", param.getCustomerId()); - screenProjectImgDataDao.delete(screenProjectImgDataEntityQueryWrapper); + baseDao.deleteByProjectId(item.getProjectId()); + screenProjectImgDataDao.deleteByProjectId(item.getProjectId()); //如果orgType未知,获取一下 // if ("unknown".equals(item.getOrgType())){ diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml index 51c75a1c17..2a123547f4 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml +++ b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml @@ -46,6 +46,19 @@ and DATE_FORMAT(PROJECT_CREATE_TIME,'%Y%m%d') = #{dateId} limit 1000 + + delete from screen_project_data + where PROJECT_ID = #{projectId} + + + + delete from screen_project_data + + + PROJECT_ID = #{projectId} + + +