From 1d0cf239e301701f5171ba790631b56e6d578049 Mon Sep 17 00:00:00 2001 From: jianjun Date: Wed, 28 Apr 2021 09:42:38 +0800 Subject: [PATCH] =?UTF-8?q?stats=20=E9=A1=B9=E7=9B=AE=E5=8F=98=E5=8A=A8?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...java => ProjectChangedCustomListener.java} | 20 ++++++++++++--- .../epmet/mq/RocketMQConsumerRegister.java | 4 ++- .../impl/FactOriginExtractServiceImpl.java | 1 + .../impl/ScreenExtractServiceImpl.java | 25 ++++++++++--------- 4 files changed, 33 insertions(+), 17 deletions(-) rename epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/{GroupAchievementCustomListener.java => ProjectChangedCustomListener.java} (85%) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java similarity index 85% rename from epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java rename to epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index 2e7b7a9772..d8712f7162 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -1,6 +1,7 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.SpringContextUtils; @@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -32,7 +34,7 @@ import java.util.stream.Collectors; * @date 2021.03.03 16:10 */ @Slf4j -public class GroupAchievementCustomListener implements MessageListenerConcurrently { +public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -51,11 +53,14 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } - private void consumeMessage(String customerId) { - logger.info("receive customerId:{}", JSON.toJSONString(customerId)); + private void consumeMessage(String msg) { + logger.info("receive customerId:{}", JSON.toJSONString(msg)); + ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); + DistributedLock distributedLock = null; RLock lock = null; try { + String customerId = msgObj.getCustomerId(); distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId) ,30L, 30L, TimeUnit.SECONDS); @@ -76,6 +81,13 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent String dateId = DimIdGenerator.getDateDimId(new Date()); extractOriginFormDTO.setDateId(dateId); Future aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null); + try { + aBoolean.get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } if (aBoolean.isDone()){ SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId); } @@ -85,7 +97,7 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent logger.error("【RocketMQ】消费项目变动消息失败:",e); } catch (Exception e) { // 不是我们自己抛出的异常,可以让MQ重试 - logger.error("【RocketMQ】消费项目变动消息失败:",e); + logger.error("【RocketMQ】消费项目变动消息异常:",e); throw e; } finally { if (distributedLock != null){ diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java index bc9b5c4a04..bcec9ddd31 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -9,12 +9,14 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component +@Order(value = 111) public class RocketMQConsumerRegister { @Value("${rocketmq.name-server}") @@ -29,7 +31,7 @@ public class RocketMQConsumerRegister { @PostConstruct public void registerAllListeners() { try { - register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new GroupAchievementCustomListener()); + register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectChangedCustomListener()); } catch (MQClientException e) { log.error("registerAllListeners exception", e); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java index 6cd7a47371..1cd62fd24a 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java @@ -242,6 +242,7 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService { } } + return true; }); return submit; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java index d13a17d2ea..ffa1936236 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java @@ -170,6 +170,19 @@ public class ScreenExtractServiceImpl implements ScreenExtractService { log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); } + //基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data + try { + ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO(); + param2.setCustomerId(customerId); + param2.setDateId(null); + log.info("【难点赌点数据上报开始------】 当前客户Id{}",param2.getCustomerId()); + //screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param); + + screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2); + log.info("【难点赌点数据上报结束------】 当前客户Id{}",param2.getCustomerId()); + }catch (Exception e){ + log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); + } extractPartData(customerId, dateId); log.info("===== extractDaily method end ======"); } @@ -185,19 +198,7 @@ public class ScreenExtractServiceImpl implements ScreenExtractService { }catch (Exception e){ log.error("中央区抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); } - //基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data - try { - log.info("【难点赌点数据上报开始------】 当前客户Id{}",param.getCustomerId()); - //screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param); - ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO(); - param2.setCustomerId(customerId); - param2.setDateId(null); - screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2); - log.info("【难点赌点数据上报结束------】 当前客户Id{}",param.getCustomerId()); - }catch (Exception e){ - log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); - } try { // 项目(事件)分析按网格_按天统计 screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);