Browse Source

stats 项目变动订阅消息

dev_shibei_match
jianjun 4 years ago
parent
commit
1d0cf239e3
  1. 20
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
  2. 4
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
  3. 1
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java
  4. 25
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

20
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java → epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -1,6 +1,7 @@
package com.epmet.mq;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.SpringContextUtils;
@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.PreDestroy;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -32,7 +34,7 @@ import java.util.stream.Collectors;
* @date 2021.03.03 16:10
*/
@Slf4j
public class GroupAchievementCustomListener implements MessageListenerConcurrently {
public class ProjectChangedCustomListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
@ -51,11 +53,14 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(String customerId) {
logger.info("receive customerId:{}", JSON.toJSONString(customerId));
private void consumeMessage(String msg) {
logger.info("receive customerId:{}", JSON.toJSONString(msg));
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
DistributedLock distributedLock = null;
RLock lock = null;
try {
String customerId = msgObj.getCustomerId();
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId)
,30L, 30L, TimeUnit.SECONDS);
@ -76,6 +81,13 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent
String dateId = DimIdGenerator.getDateDimId(new Date());
extractOriginFormDTO.setDateId(dateId);
Future<?> aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null);
try {
aBoolean.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
if (aBoolean.isDone()){
SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId);
}
@ -85,7 +97,7 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent
logger.error("【RocketMQ】消费项目变动消息失败:",e);
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【RocketMQ】消费项目变动消息失败:",e);
logger.error("【RocketMQ】消费项目变动消息异常:",e);
throw e;
} finally {
if (distributedLock != null){

4
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java

@ -9,12 +9,14 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
@Order(value = 111)
public class RocketMQConsumerRegister {
@Value("${rocketmq.name-server}")
@ -29,7 +31,7 @@ public class RocketMQConsumerRegister {
@PostConstruct
public void registerAllListeners() {
try {
register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new GroupAchievementCustomListener());
register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectChangedCustomListener());
} catch (MQClientException e) {
log.error("registerAllListeners exception", e);
}

1
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java

@ -242,6 +242,7 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
}
return true;
});
return submit;
}

25
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

@ -170,6 +170,19 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
try {
ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
param2.setCustomerId(customerId);
param2.setDateId(null);
log.info("【难点赌点数据上报开始------】 当前客户Id{}",param2.getCustomerId());
//screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
log.info("【难点赌点数据上报结束------】 当前客户Id{}",param2.getCustomerId());
}catch (Exception e){
log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
extractPartData(customerId, dateId);
log.info("===== extractDaily method end ======");
}
@ -185,19 +198,7 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
}catch (Exception e){
log.error("中央区抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
try {
log.info("【难点赌点数据上报开始------】 当前客户Id{}",param.getCustomerId());
//screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
param2.setCustomerId(customerId);
param2.setDateId(null);
screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
log.info("【难点赌点数据上报结束------】 当前客户Id{}",param.getCustomerId());
}catch (Exception e){
log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
try {
// 项目(事件)分析按网格_按天统计
screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);

Loading…
Cancel
Save