From 0cc8b9b3a9b1c7ed6d397917510a77eebf75fd70 Mon Sep 17 00:00:00 2001 From: jianjun Date: Tue, 27 Apr 2021 17:37:13 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=8F=98=E5=8A=A8?= =?UTF-8?q?=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- epmet-commons/epmet-commons-rocketmq/pom.xml | 6 +- .../messages/ProjectChangedMQMsg.java | 17 +++++ .../epmet-message-client/pom.xml | 6 ++ .../com/epmet/constant/SystemMessageType.java | 5 ++ .../java/com/epmet/send/SendMqMsgUtil.java | 69 +++++++++++++++++++ 5 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java create mode 100644 epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java diff --git a/epmet-commons/epmet-commons-rocketmq/pom.xml b/epmet-commons/epmet-commons-rocketmq/pom.xml index a3761c000f..274ec09b58 100644 --- a/epmet-commons/epmet-commons-rocketmq/pom.xml +++ b/epmet-commons/epmet-commons-rocketmq/pom.xml @@ -17,5 +17,9 @@ rocketmq-spring-boot-starter 2.0.1 + + org.projectlombok + lombok + - \ No newline at end of file + diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java new file mode 100644 index 0000000000..075b3c8ce7 --- /dev/null +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java @@ -0,0 +1,17 @@ +package com.epmet.commons.rocketmq.messages; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +/** + * desc:项目变动通知消息实体类 + * @author liujianjun + */ +@Data +@AllArgsConstructor +public class ProjectChangedMQMsg implements Serializable { + + private String customerId; +} diff --git a/epmet-module/epmet-message/epmet-message-client/pom.xml b/epmet-module/epmet-message/epmet-message-client/pom.xml index 4c7ff05c15..6adcaf4a98 100644 --- a/epmet-module/epmet-message/epmet-message-client/pom.xml +++ b/epmet-module/epmet-message/epmet-message-client/pom.xml @@ -26,6 +26,12 @@ io.springfox springfox-swagger-ui + + com.epmet + epmet-commons-rocketmq + 2.0.0 + compile + diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java index 382b96e275..f3f98bac35 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java @@ -10,4 +10,9 @@ public interface SystemMessageType { */ String INIT_CUSTOMER = "init_customer"; + /** + * 项目变动 + */ + String PROJECT_CHANGED = "project_changed"; + } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java new file mode 100644 index 0000000000..2a382127b9 --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java @@ -0,0 +1,69 @@ +package com.epmet.send; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; +import com.epmet.commons.tools.constant.NumConstant; +import com.epmet.commons.tools.utils.Result; +import com.epmet.constant.SystemMessageType; +import com.epmet.dto.form.SystemMsgFormDTO; +import com.epmet.feign.EpmetMessageOpenFeignClient; +import lombok.extern.slf4j.Slf4j; + +/** + * desc: 发送mq消息直接到rocketMq 系统 + * + * @author: LiuJanJun + * @date: 2021/4/23 2:39 下午 + * @versio: 1.0 + */ +@Slf4j +public class SendMqMsgUtil { + private static final SendMqMsgUtil INSTANCE = new SendMqMsgUtil(); + + private SendMqMsgUtil() { + + } + + private EpmetMessageOpenFeignClient epmetMessageOpenFeignClient; + + public static SendMqMsgUtil build() { + return INSTANCE; + } + + public SendMqMsgUtil openFeignClient(EpmetMessageOpenFeignClient epmetMessageOpenFeignClient) { + this.epmetMessageOpenFeignClient = epmetMessageOpenFeignClient; + return this; + } + + /** + * desc: 发送小组成就消息,计算小组成就 + * + * @param msgContent + * @return boolean + * @author LiuJanJun + * @date 2021/4/23 3:01 下午 + * @remark 失败重试1次,调用端自行判断如果失败是否要继续执行 + */ + public boolean sendProjectChangedMqMsg(ProjectChangedMQMsg msgContent) { + try { + SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO(); + systemMsgFormDTO.setMessageType(SystemMessageType.PROJECT_CHANGED); + systemMsgFormDTO.setContent(msgContent); + Result sendMsgResult = null; + log.info("sendProjectChangedMqMsg param:{}",msgContent); + int retryTime = 0; + do { + sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO); + } while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO); + + if (sendMsgResult != null && sendMsgResult.success()) { + return true; + } + log.error("发送(项目变动)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(systemMsgFormDTO)); + } catch (Exception e) { + log.error("sendMqMsg exception", e); + } + return false; + } + +} From ffcd3d47eb55f49a06dceeec1ec340882e0a8710 Mon Sep 17 00:00:00 2001 From: jianjun Date: Tue, 27 Apr 2021 18:40:31 +0800 Subject: [PATCH 2/4] =?UTF-8?q?stats=20=E9=A1=B9=E7=9B=AE=E5=8F=98?= =?UTF-8?q?=E5=8A=A8=E8=AE=A2=E9=98=85=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/ConsomerGroupConstants.java | 4 + .../rocketmq/constants/TopicConstants.java | 1 + .../data-statistical-server/pom.xml | 18 ++++ .../mq/GroupAchievementCustomListener.java | 102 ++++++++++++++++++ .../epmet/mq/RocketMQConsumerRegister.java | 66 ++++++++++++ .../todata/FactOriginExtractService.java | 10 ++ .../impl/FactOriginExtractServiceImpl.java | 15 ++- .../toscreen/ScreenExtractService.java | 11 ++ .../impl/ScreenExtractServiceImpl.java | 7 +- .../src/main/resources/bootstrap.yml | 3 + 10 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java create mode 100644 epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index ded5a9aa8d..de07ea3106 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -21,5 +21,9 @@ public interface ConsomerGroupConstants { * 客户初始化议题、项目分类、标签数据 */ String ISSUE_PROJECT_CATEGORY_TAG = "issue_project_category_tag"; + /** + * 项目变动通知消费者组 + */ + String PROJECT_CHANGED_COMPONENTS_GROUP = "project_changed_components_group"; } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index 7687136498..bbf28a5884 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -2,4 +2,5 @@ package com.epmet.commons.rocketmq.constants; public interface TopicConstants { String INIT_CUSTOMER = "init_customer"; + String PROJECT_CHANGED = "project_changed"; } diff --git a/epmet-module/data-statistical/data-statistical-server/pom.xml b/epmet-module/data-statistical/data-statistical-server/pom.xml index 5917c6bda9..0eb1191613 100644 --- a/epmet-module/data-statistical/data-statistical-server/pom.xml +++ b/epmet-module/data-statistical/data-statistical-server/pom.xml @@ -110,6 +110,12 @@ 4.3 compile + + com.epmet + epmet-commons-rocketmq + 2.0.0 + compile + @@ -248,6 +254,9 @@ https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19 + + + 192.168.1.130:9876;192.168.1.132:9876 @@ -364,6 +373,9 @@ https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19 + + + 192.168.1.130:9876;192.168.1.132:9876 @@ -479,6 +491,9 @@ https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19 + + + 192.168.10.161:9876 @@ -592,6 +607,9 @@ https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + + 192.168.11.187:9876;192.168.11.184:9876 diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java new file mode 100644 index 0000000000..2e7b7a9772 --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java @@ -0,0 +1,102 @@ +package com.epmet.mq; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.dto.extract.form.ExtractOriginFormDTO; +import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; +import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; +import com.epmet.util.DimIdGenerator; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PreDestroy; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * @Description 项目变动-监听器 + * @return + * @author wxz + * @date 2021.03.03 16:10 +*/ +@Slf4j +public class GroupAchievementCustomListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + long start = System.currentTimeMillis(); + try { + List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + customerIds.forEach(this::consumeMessage); + } catch (Exception e) { + //失败不重发 + logger.error("consumeMessage fail,msg:{}",e.getMessage()); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(String customerId) { + logger.info("receive customerId:{}", JSON.toJSONString(customerId)); + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId) + ,30L, 30L, TimeUnit.SECONDS); + + if (StringUtils.isBlank(customerId)){ + logger.error("consumer project_changed fail,msg:{}",customerId); + return; + } + //消息被消费太快 业务数据还没有完成 歇一会先 + try { + Thread.sleep(60L); + } catch (InterruptedException e) { + logger.error("consumeMessage sleep exception",e); + } + ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO(); + extractOriginFormDTO.setCustomerId(customerId); + + String dateId = DimIdGenerator.getDateDimId(new Date()); + extractOriginFormDTO.setDateId(dateId); + Future aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null); + if (aBoolean.isDone()){ + SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId); + } + logger.info("consumer projectChanged msg success,{}",aBoolean); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【RocketMQ】消费项目变动消息失败:",e); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【RocketMQ】消费项目变动消息失败:",e); + throw e; + } finally { + if (distributedLock != null){ + distributedLock.unLock(lock); + } + } + } + @PreDestroy + public void saveCalStatus() { + //todo + log.info("data-statical-server服务被关闭,执行未执行完的动作"); + + } +} diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..bc9b5c4a04 --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -0,0 +1,66 @@ +package com.epmet.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.commons.tools.constant.NumConstant; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Slf4j +@Component +public class RocketMQConsumerRegister { + + @Value("${rocketmq.name-server}") + private String nameServer; + + /** + * @return + * @Description 注册监听器 + * @author wxz + * @date 2021.03.03 16:09 + */ + @PostConstruct + public void registerAllListeners() { + try { + register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new GroupAchievementCustomListener()); + } catch (MQClientException e) { + log.error("registerAllListeners exception", e); + } + } + + public void register(String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException { + // 实例化消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + + // 设置NameServer的地址 + consumer.setNamesrvAddr(nameServer); + consumer.setMessageModel(messageModel); + consumer.setInstanceName(buildInstanceName()); + // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 + consumer.subscribe(topic, subException); + // 注册回调实现类来处理从broker拉取回来的消息 + consumer.registerMessageListener(listener); + //一次批量拉去10条消息 + consumer.setConsumeMessageBatchMaxSize(NumConstant.TEN); + // 启动消费者实例 + consumer.start(); + } + + private String buildInstanceName() { + String instanceName = ""; + for (int i = 0; i < 4; i++) { + int t = (int) (Math.random() * 10); + instanceName = instanceName.concat(t + ""); + } + + return instanceName; + } + +} diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java index 807fb5067f..8ea0ed4216 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java @@ -2,6 +2,9 @@ package com.epmet.service.evaluationindex.extract.todata; import com.epmet.dto.extract.form.ExtractOriginFormDTO; +import java.util.List; +import java.util.concurrent.Future; + /** * @author zhaoqifeng * @dscription @@ -15,4 +18,11 @@ public interface FactOriginExtractService { * @param extractOriginFormDTO */ void extractAll(ExtractOriginFormDTO extractOriginFormDTO); + + /** + * desc:抽取项目相关业务数据到统计库 + * + * @param param + */ + Future submitProjectRelationData(ExtractOriginFormDTO param, List finalDaysBetween); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java index 1d6e4dc9fd..6cd7a47371 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java @@ -83,7 +83,7 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService { } private void submitJob(ExtractOriginFormDTO param) { - boolean isRange = StringUtils.isBlank(param.getDateId()) ? true : false; + boolean isRange = StringUtils.isBlank(param.getDateId()); List daysBetween = null; if (isRange) { daysBetween = DateUtils.getDaysBetween(param.getStartDate(), param.getEndDate()); @@ -188,9 +188,15 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService { } } }); - threadPool.submit(() -> { - ExtractOriginFormDTO paramNew = ConvertUtils.sourceToTarget(param, ExtractOriginFormDTO.class); - if (!isRange) { + submitProjectRelationData(param, finalDaysBetween); + } + + @Override + public Future submitProjectRelationData(ExtractOriginFormDTO param, List finalDaysBetween) { + Future submit = threadPool.submit(() -> { + ExtractOriginFormDTO paramNew = null; + if (CollectionUtils.isEmpty(finalDaysBetween)) { + paramNew = ConvertUtils.sourceToTarget(param, ExtractOriginFormDTO.class); try { projectExtractService.saveOriginProjectDaily(paramNew); } catch (Exception e) { @@ -237,5 +243,6 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService { } }); + return submit; } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java index eefe383408..0877c53129 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java @@ -25,4 +25,15 @@ public interface ScreenExtractService { */ void extractMonthlyAll(ExtractScreenFormDTO formDTO); + /** + * desc: 实时抽取相关的数据 + * + * @param customerId + * @param dateId + * @return void + * @author LiuJanJun + * @date 2021/4/27 6:12 下午 + */ + void extractPartData(String customerId, String dateId); + } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java index 0210ea7b30..d13a17d2ea 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java @@ -170,6 +170,12 @@ public class ScreenExtractServiceImpl implements ScreenExtractService { log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); } + extractPartData(customerId, dateId); + log.info("===== extractDaily method end ======"); + } + + @Override + public void extractPartData(String customerId, String dateId) { ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO(); param.setCustomerId(customerId); param.setDateId(dateId); @@ -223,7 +229,6 @@ public class ScreenExtractServiceImpl implements ScreenExtractService { }catch(Exception e){ log.error("按天统计:组织内各个分类下的项目总数,customerId为:"+customerId+"dateId为:"+dateId, e); } - log.info("===== extractDaily method end ======"); } /** diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml b/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml index f94ee99777..c0479d72a3 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml +++ b/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml @@ -205,3 +205,6 @@ shutdown: graceful: enable: true #是否开启优雅停机 waitTimeSecs: 30 # 优雅停机等待时间,超过30秒,发出告警 + +rocketmq: + name-server: @rocketmq.nameserver@ From 1d0cf239e301701f5171ba790631b56e6d578049 Mon Sep 17 00:00:00 2001 From: jianjun Date: Wed, 28 Apr 2021 09:42:38 +0800 Subject: [PATCH 3/4] =?UTF-8?q?stats=20=E9=A1=B9=E7=9B=AE=E5=8F=98?= =?UTF-8?q?=E5=8A=A8=E8=AE=A2=E9=98=85=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...java => ProjectChangedCustomListener.java} | 20 ++++++++++++--- .../epmet/mq/RocketMQConsumerRegister.java | 4 ++- .../impl/FactOriginExtractServiceImpl.java | 1 + .../impl/ScreenExtractServiceImpl.java | 25 ++++++++++--------- 4 files changed, 33 insertions(+), 17 deletions(-) rename epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/{GroupAchievementCustomListener.java => ProjectChangedCustomListener.java} (85%) diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java similarity index 85% rename from epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java rename to epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index 2e7b7a9772..d8712f7162 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -1,6 +1,7 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.SpringContextUtils; @@ -21,6 +22,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -32,7 +34,7 @@ import java.util.stream.Collectors; * @date 2021.03.03 16:10 */ @Slf4j -public class GroupAchievementCustomListener implements MessageListenerConcurrently { +public class ProjectChangedCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); @@ -51,11 +53,14 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } - private void consumeMessage(String customerId) { - logger.info("receive customerId:{}", JSON.toJSONString(customerId)); + private void consumeMessage(String msg) { + logger.info("receive customerId:{}", JSON.toJSONString(msg)); + ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); + DistributedLock distributedLock = null; RLock lock = null; try { + String customerId = msgObj.getCustomerId(); distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId) ,30L, 30L, TimeUnit.SECONDS); @@ -76,6 +81,13 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent String dateId = DimIdGenerator.getDateDimId(new Date()); extractOriginFormDTO.setDateId(dateId); Future aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null); + try { + aBoolean.get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } if (aBoolean.isDone()){ SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId); } @@ -85,7 +97,7 @@ public class GroupAchievementCustomListener implements MessageListenerConcurrent logger.error("【RocketMQ】消费项目变动消息失败:",e); } catch (Exception e) { // 不是我们自己抛出的异常,可以让MQ重试 - logger.error("【RocketMQ】消费项目变动消息失败:",e); + logger.error("【RocketMQ】消费项目变动消息异常:",e); throw e; } finally { if (distributedLock != null){ diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java index bc9b5c4a04..bcec9ddd31 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -9,12 +9,14 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component +@Order(value = 111) public class RocketMQConsumerRegister { @Value("${rocketmq.name-server}") @@ -29,7 +31,7 @@ public class RocketMQConsumerRegister { @PostConstruct public void registerAllListeners() { try { - register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new GroupAchievementCustomListener()); + register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectChangedCustomListener()); } catch (MQClientException e) { log.error("registerAllListeners exception", e); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java index 6cd7a47371..1cd62fd24a 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java @@ -242,6 +242,7 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService { } } + return true; }); return submit; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java index d13a17d2ea..ffa1936236 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java @@ -170,6 +170,19 @@ public class ScreenExtractServiceImpl implements ScreenExtractService { log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); } + //基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data + try { + ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO(); + param2.setCustomerId(customerId); + param2.setDateId(null); + log.info("【难点赌点数据上报开始------】 当前客户Id{}",param2.getCustomerId()); + //screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param); + + screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2); + log.info("【难点赌点数据上报结束------】 当前客户Id{}",param2.getCustomerId()); + }catch (Exception e){ + log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); + } extractPartData(customerId, dateId); log.info("===== extractDaily method end ======"); } @@ -185,19 +198,7 @@ public class ScreenExtractServiceImpl implements ScreenExtractService { }catch (Exception e){ log.error("中央区抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); } - //基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data - try { - log.info("【难点赌点数据上报开始------】 当前客户Id{}",param.getCustomerId()); - //screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param); - ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO(); - param2.setCustomerId(customerId); - param2.setDateId(null); - screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2); - log.info("【难点赌点数据上报结束------】 当前客户Id{}",param.getCustomerId()); - }catch (Exception e){ - log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e); - } try { // 项目(事件)分析按网格_按天统计 screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId); From 08c121c0801ea15d762c0ea45c9e71144579528f Mon Sep 17 00:00:00 2001 From: zhaoqifeng Date: Wed, 28 Apr 2021 09:57:19 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/ProjectProcessServiceImpl.java | 16 ++++++ .../service/impl/ProjectServiceImpl.java | 52 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java b/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java index 455e085999..2046de6106 100644 --- a/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java +++ b/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java @@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.constant.AppClientConstant; import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.NumConstant; @@ -57,6 +58,7 @@ import com.epmet.feign.EpmetUserFeignClient; import com.epmet.feign.EpmetUserOpenFeignClient; import com.epmet.feign.GovOrgFeignClient; import com.epmet.redis.ProjectProcessRedis; +import com.epmet.send.SendMqMsgUtil; import com.epmet.service.ProjectProcessService; import com.epmet.service.ProjectService; import com.epmet.service.ProjectStaffService; @@ -384,6 +386,13 @@ public class ProjectProcessServiceImpl extends BaseServiceImpl