From 80f82908c5dcc352eaa48a47269bf145aec62847 Mon Sep 17 00:00:00 2001 From: wxz Date: Fri, 9 Jul 2021 17:38:12 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E5=8F=98=E5=8A=A8=E6=B6=88=E8=B4=B9=E9=A2=91=E7=8E=87=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E7=94=A8redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/ProjectChangedCustomListener.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index a9f266edf4..d1b789dc7e 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -5,6 +5,8 @@ import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; @@ -21,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.PreDestroy; import java.util.Date; @@ -41,24 +44,16 @@ import java.util.stream.Collectors; public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); - /** - * 控制通知类型消息的消费频率 - */ - private static final Cache customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED) - .expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build(); + + private RedisUtils redisUtils; @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { long start = System.currentTimeMillis(); try { - List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - for (String customerId : customerIds) { - //获取缓存 如果不存在缓存中 则执行消费 并放入缓存中 - String ifPresent = customerIdCache.getIfPresent(customerId); - if (StringUtils.isBlank(ifPresent)){ - consumeMessage(customerId); - customerIdCache.put(customerId,customerId); - } + List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + for (String msgStr : msgStrs) { + consumeMessage(msgStr); } } catch (Exception e) { //失败不重发 @@ -76,6 +71,23 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently log.warn("consumeMessage msg body is blank"); return; } + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId()); + if (redisUtils.get(redisKey) == null) { + consumeMessage(msgObj); + log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); + // 有效期30秒 + redisUtils.set(redisKey, msg, 30); + } else { + log.info("该项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); + } + } + + public void consumeMessage(ProjectChangedMQMsg msgObj) { DistributedLock distributedLock = null; RLock lock = null; try { @@ -128,6 +140,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } } } + @PreDestroy public void saveCalStatus() { //todo From fa862c51ec93e96428362508510570a6eaf68a52 Mon Sep 17 00:00:00 2001 From: wxz Date: Fri, 9 Jul 2021 17:42:37 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/epmet/commons/tools/redis/RedisKeys.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 68c417c580..6f5e7bf1f3 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -469,4 +469,8 @@ public class RedisKeys { public static String getCorsConfigKey() { return rootPrefix.concat("sys:cors"); } + + public static String getProjectChangedMsgDistinceKey(String customerId) { + return rootPrefix.concat("project_changed:consume:").concat(customerId); + } } From 75311c7fe932f7b1d4ae0c513745c9ef9d633fd0 Mon Sep 17 00:00:00 2001 From: zhaoqifeng Date: Fri, 9 Jul 2021 17:44:22 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E5=88=A0=E9=99=A4=E6=94=B9=E4=B8=BA=E7=89=A9?= =?UTF-8?q?=E7=90=86=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../screen/ScreenProjectDataDao.java | 18 ++++++++++++++++++ .../screen/ScreenProjectImgDataDao.java | 2 ++ .../impl/ScreenProjectDataServiceImpl.java | 10 ++-------- .../screen/ScreenProjectDataDao.xml | 13 +++++++++++++ .../screen/ScreenProjectImgDataDao.xml | 4 ++++ 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java index 1cdb4a1e8d..5410417202 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java @@ -46,4 +46,22 @@ public interface ScreenProjectDataDao extends BaseDao { void insertBatch(@Param("list") List list); void updateBatch(@Param("list") List list,@Param("dateId") String dateId); + + /** + * 根据项目ID删除数据 + * @author zhaoqifeng + * @date 2021/7/9 17:33 + * @param projectId + * @return int + */ + int deleteByProjectId(@Param("projectId") String projectId); + + /** + * 根据项目ID删除数据 + * @author zhaoqifeng + * @date 2021/7/9 17:33 + * @param list + * @return int + */ + void deleteByProjectIds(@Param("list") List list); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java index 4904f32724..9335e8aa61 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java @@ -37,4 +37,6 @@ public interface ScreenProjectImgDataDao extends BaseDao list); void insertBatch(@Param("list") List list); + + int deleteByProjectId(@Param("projectId") String projectId); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java index bf8c212f6b..15deb2bb57 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java @@ -177,14 +177,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl deleteWrapper = new QueryWrapper<>(); - deleteWrapper.eq(StringUtils.isNotBlank(item.getProjectId()), "project_id", item.getProjectId()) - .eq(StringUtils.isNotBlank(param.getCustomerId()), "customer_id", param.getCustomerId()); - baseDao.delete(deleteWrapper); - QueryWrapper screenProjectImgDataEntityQueryWrapper = new QueryWrapper<>(); - screenProjectImgDataEntityQueryWrapper.eq(StringUtils.isNotBlank(item.getProjectId()), "project_id", item.getProjectId()) - .eq(StringUtils.isNotBlank(param.getCustomerId()), "customer_id", param.getCustomerId()); - screenProjectImgDataDao.delete(screenProjectImgDataEntityQueryWrapper); + baseDao.deleteByProjectId(item.getProjectId()); + screenProjectImgDataDao.deleteByProjectId(item.getProjectId()); //如果orgType未知,获取一下 // if ("unknown".equals(item.getOrgType())){ diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml index 51c75a1c17..2a123547f4 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml +++ b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml @@ -46,6 +46,19 @@ and DATE_FORMAT(PROJECT_CREATE_TIME,'%Y%m%d') = #{dateId} limit 1000 + + delete from screen_project_data + where PROJECT_ID = #{projectId} + + + + delete from screen_project_data + + + PROJECT_ID = #{projectId} + + +