diff --git a/epmet-commons/epmet-commons-rocketmq/pom.xml b/epmet-commons/epmet-commons-rocketmq/pom.xml
index e61ac46c9e..346f055ccc 100644
--- a/epmet-commons/epmet-commons-rocketmq/pom.xml
+++ b/epmet-commons/epmet-commons-rocketmq/pom.xml
@@ -22,5 +22,9 @@
org.projectlombok
lombok
+
+ org.projectlombok
+ lombok
+
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
index 38ec75d055..5acfebb90e 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
@@ -21,6 +21,10 @@ public interface ConsomerGroupConstants {
* 客户初始化议题、项目分类、标签数据
*/
String ISSUE_PROJECT_CATEGORY_TAG = "issue_project_category_tag";
+ /**
+ * 项目变动通知消费者组
+ */
+ String PROJECT_CHANGED_COMPONENTS_GROUP = "project_changed_components_group";
/**
* 小组成就消费者组
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
index d0f7b7f829..69c85f4216 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
@@ -9,4 +9,5 @@ public interface TopicConstants {
* 小组成就
*/
String GROUP_ACHIEVEMENT = "group_achievement";
+ String PROJECT_CHANGED = "project_changed";
}
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java
new file mode 100644
index 0000000000..075b3c8ce7
--- /dev/null
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/ProjectChangedMQMsg.java
@@ -0,0 +1,17 @@
+package com.epmet.commons.rocketmq.messages;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * desc:项目变动通知消息实体类
+ * @author liujianjun
+ */
+@Data
+@AllArgsConstructor
+public class ProjectChangedMQMsg implements Serializable {
+
+ private String customerId;
+}
diff --git a/epmet-module/data-statistical/data-statistical-server/pom.xml b/epmet-module/data-statistical/data-statistical-server/pom.xml
index 5917c6bda9..0eb1191613 100644
--- a/epmet-module/data-statistical/data-statistical-server/pom.xml
+++ b/epmet-module/data-statistical/data-statistical-server/pom.xml
@@ -110,6 +110,12 @@
4.3
compile
+
+ com.epmet
+ epmet-commons-rocketmq
+ 2.0.0
+ compile
+
@@ -248,6 +254,9 @@
https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c
SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19
+
+
+ 192.168.1.130:9876;192.168.1.132:9876
@@ -364,6 +373,9 @@
https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c
SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19
+
+
+ 192.168.1.130:9876;192.168.1.132:9876
@@ -479,6 +491,9 @@
https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c
SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19
+
+
+ 192.168.10.161:9876
@@ -592,6 +607,9 @@
https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c
SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1
+
+
+ 192.168.11.187:9876;192.168.11.184:9876
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
new file mode 100644
index 0000000000..d8712f7162
--- /dev/null
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
@@ -0,0 +1,114 @@
+package com.epmet.mq;
+
+import com.alibaba.fastjson.JSON;
+import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
+import com.epmet.commons.tools.distributedlock.DistributedLock;
+import com.epmet.commons.tools.exception.RenException;
+import com.epmet.commons.tools.utils.SpringContextUtils;
+import com.epmet.dto.extract.form.ExtractOriginFormDTO;
+import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService;
+import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService;
+import com.epmet.util.DimIdGenerator;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.redisson.api.RLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PreDestroy;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @Description 项目变动-监听器
+ * @return
+ * @author wxz
+ * @date 2021.03.03 16:10
+*/
+@Slf4j
+public class ProjectChangedCustomListener implements MessageListenerConcurrently {
+
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ long start = System.currentTimeMillis();
+ try {
+ List customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
+ customerIds.forEach(this::consumeMessage);
+ } catch (Exception e) {
+ //失败不重发
+ logger.error("consumeMessage fail,msg:{}",e.getMessage());
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ private void consumeMessage(String msg) {
+ logger.info("receive customerId:{}", JSON.toJSONString(msg));
+ ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
+
+ DistributedLock distributedLock = null;
+ RLock lock = null;
+ try {
+ String customerId = msgObj.getCustomerId();
+ distributedLock = SpringContextUtils.getBean(DistributedLock.class);
+ lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId)
+ ,30L, 30L, TimeUnit.SECONDS);
+
+ if (StringUtils.isBlank(customerId)){
+ logger.error("consumer project_changed fail,msg:{}",customerId);
+ return;
+ }
+ //消息被消费太快 业务数据还没有完成 歇一会先
+ try {
+ Thread.sleep(60L);
+ } catch (InterruptedException e) {
+ logger.error("consumeMessage sleep exception",e);
+ }
+ ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO();
+ extractOriginFormDTO.setCustomerId(customerId);
+
+ String dateId = DimIdGenerator.getDateDimId(new Date());
+ extractOriginFormDTO.setDateId(dateId);
+ Future> aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null);
+ try {
+ aBoolean.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ if (aBoolean.isDone()){
+ SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId);
+ }
+ logger.info("consumer projectChanged msg success,{}",aBoolean);
+ } catch (RenException e) {
+ // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
+ logger.error("【RocketMQ】消费项目变动消息失败:",e);
+ } catch (Exception e) {
+ // 不是我们自己抛出的异常,可以让MQ重试
+ logger.error("【RocketMQ】消费项目变动消息异常:",e);
+ throw e;
+ } finally {
+ if (distributedLock != null){
+ distributedLock.unLock(lock);
+ }
+ }
+ }
+ @PreDestroy
+ public void saveCalStatus() {
+ //todo
+ log.info("data-statical-server服务被关闭,执行未执行完的动作");
+
+ }
+}
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
new file mode 100644
index 0000000000..bcec9ddd31
--- /dev/null
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
@@ -0,0 +1,68 @@
+package com.epmet.mq;
+
+import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
+import com.epmet.commons.rocketmq.constants.TopicConstants;
+import com.epmet.commons.tools.constant.NumConstant;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+@Slf4j
+@Component
+@Order(value = 111)
+public class RocketMQConsumerRegister {
+
+ @Value("${rocketmq.name-server}")
+ private String nameServer;
+
+ /**
+ * @return
+ * @Description 注册监听器
+ * @author wxz
+ * @date 2021.03.03 16:09
+ */
+ @PostConstruct
+ public void registerAllListeners() {
+ try {
+ register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectChangedCustomListener());
+ } catch (MQClientException e) {
+ log.error("registerAllListeners exception", e);
+ }
+ }
+
+ public void register(String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException {
+ // 实例化消费者
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+
+ // 设置NameServer的地址
+ consumer.setNamesrvAddr(nameServer);
+ consumer.setMessageModel(messageModel);
+ consumer.setInstanceName(buildInstanceName());
+ // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
+ consumer.subscribe(topic, subException);
+ // 注册回调实现类来处理从broker拉取回来的消息
+ consumer.registerMessageListener(listener);
+ //一次批量拉去10条消息
+ consumer.setConsumeMessageBatchMaxSize(NumConstant.TEN);
+ // 启动消费者实例
+ consumer.start();
+ }
+
+ private String buildInstanceName() {
+ String instanceName = "";
+ for (int i = 0; i < 4; i++) {
+ int t = (int) (Math.random() * 10);
+ instanceName = instanceName.concat(t + "");
+ }
+
+ return instanceName;
+ }
+
+}
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java
index 807fb5067f..8ea0ed4216 100644
--- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java
@@ -2,6 +2,9 @@ package com.epmet.service.evaluationindex.extract.todata;
import com.epmet.dto.extract.form.ExtractOriginFormDTO;
+import java.util.List;
+import java.util.concurrent.Future;
+
/**
* @author zhaoqifeng
* @dscription
@@ -15,4 +18,11 @@ public interface FactOriginExtractService {
* @param extractOriginFormDTO
*/
void extractAll(ExtractOriginFormDTO extractOriginFormDTO);
+
+ /**
+ * desc:抽取项目相关业务数据到统计库
+ *
+ * @param param
+ */
+ Future> submitProjectRelationData(ExtractOriginFormDTO param, List finalDaysBetween);
}
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java
index 1d6e4dc9fd..1cd62fd24a 100644
--- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java
@@ -83,7 +83,7 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
private void submitJob(ExtractOriginFormDTO param) {
- boolean isRange = StringUtils.isBlank(param.getDateId()) ? true : false;
+ boolean isRange = StringUtils.isBlank(param.getDateId());
List daysBetween = null;
if (isRange) {
daysBetween = DateUtils.getDaysBetween(param.getStartDate(), param.getEndDate());
@@ -188,9 +188,15 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
}
});
- threadPool.submit(() -> {
- ExtractOriginFormDTO paramNew = ConvertUtils.sourceToTarget(param, ExtractOriginFormDTO.class);
- if (!isRange) {
+ submitProjectRelationData(param, finalDaysBetween);
+ }
+
+ @Override
+ public Future> submitProjectRelationData(ExtractOriginFormDTO param, List finalDaysBetween) {
+ Future> submit = threadPool.submit(() -> {
+ ExtractOriginFormDTO paramNew = null;
+ if (CollectionUtils.isEmpty(finalDaysBetween)) {
+ paramNew = ConvertUtils.sourceToTarget(param, ExtractOriginFormDTO.class);
try {
projectExtractService.saveOriginProjectDaily(paramNew);
} catch (Exception e) {
@@ -236,6 +242,8 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
}
+ return true;
});
+ return submit;
}
}
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java
index eefe383408..0877c53129 100644
--- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java
@@ -25,4 +25,15 @@ public interface ScreenExtractService {
*/
void extractMonthlyAll(ExtractScreenFormDTO formDTO);
+ /**
+ * desc: 实时抽取相关的数据
+ *
+ * @param customerId
+ * @param dateId
+ * @return void
+ * @author LiuJanJun
+ * @date 2021/4/27 6:12 下午
+ */
+ void extractPartData(String customerId, String dateId);
+
}
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java
index 0210ea7b30..ffa1936236 100644
--- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java
@@ -170,6 +170,25 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
+ //基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
+ try {
+ ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
+ param2.setCustomerId(customerId);
+ param2.setDateId(null);
+ log.info("【难点赌点数据上报开始------】 当前客户Id{}",param2.getCustomerId());
+ //screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
+
+ screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
+ log.info("【难点赌点数据上报结束------】 当前客户Id{}",param2.getCustomerId());
+ }catch (Exception e){
+ log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
+ }
+ extractPartData(customerId, dateId);
+ log.info("===== extractDaily method end ======");
+ }
+
+ @Override
+ public void extractPartData(String customerId, String dateId) {
ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO();
param.setCustomerId(customerId);
param.setDateId(dateId);
@@ -179,19 +198,7 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
}catch (Exception e){
log.error("中央区抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
- //基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
- try {
- log.info("【难点赌点数据上报开始------】 当前客户Id{}",param.getCustomerId());
- //screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
- ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
- param2.setCustomerId(customerId);
- param2.setDateId(null);
- screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
- log.info("【难点赌点数据上报结束------】 当前客户Id{}",param.getCustomerId());
- }catch (Exception e){
- log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
- }
try {
// 项目(事件)分析按网格_按天统计
screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);
@@ -223,7 +230,6 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
}catch(Exception e){
log.error("按天统计:组织内各个分类下的项目总数,customerId为:"+customerId+"dateId为:"+dateId, e);
}
- log.info("===== extractDaily method end ======");
}
/**
diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml b/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml
index f94ee99777..c0479d72a3 100644
--- a/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml
+++ b/epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml
@@ -205,3 +205,6 @@ shutdown:
graceful:
enable: true #是否开启优雅停机
waitTimeSecs: 30 # 优雅停机等待时间,超过30秒,发出告警
+
+rocketmq:
+ name-server: @rocketmq.nameserver@
diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
index 545940ecea..d40e114e46 100644
--- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
+++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
@@ -15,4 +15,9 @@ public interface SystemMessageType {
*/
String GROUP_ACHIEVEMENT = "group_achievement";
+ /**
+ * 项目变动
+ */
+ String PROJECT_CHANGED = "project_changed";
+
}
diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java
index b0614dc818..6287c09b5e 100644
--- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java
+++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java
@@ -1,6 +1,7 @@
package com.epmet.send;
import com.alibaba.fastjson.JSON;
+import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.utils.Result;
@@ -66,4 +67,35 @@ public class SendMqMsgUtil {
return false;
}
+ /**
+ * desc: 发送小组成就消息,计算小组成就
+ *
+ * @param msgContent
+ * @return boolean
+ * @author LiuJanJun
+ * @date 2021/4/23 3:01 下午
+ * @remark 失败重试1次,调用端自行判断如果失败是否要继续执行
+ */
+ public boolean sendProjectChangedMqMsg(ProjectChangedMQMsg msgContent) {
+ try {
+ SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO();
+ systemMsgFormDTO.setMessageType(SystemMessageType.PROJECT_CHANGED);
+ systemMsgFormDTO.setContent(msgContent);
+ Result sendMsgResult = null;
+ log.info("sendProjectChangedMqMsg param:{}",msgContent);
+ int retryTime = 0;
+ do {
+ sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO);
+ } while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO);
+
+ if (sendMsgResult != null && sendMsgResult.success()) {
+ return true;
+ }
+ log.error("发送(项目变动)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(systemMsgFormDTO));
+ } catch (Exception e) {
+ log.error("sendMqMsg exception", e);
+ }
+ return false;
+ }
+
}
diff --git a/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java b/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java
index ce39b3b985..9ce65ccb2a 100644
--- a/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java
+++ b/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectProcessServiceImpl.java
@@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
+import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.constant.AppClientConstant;
import com.epmet.commons.tools.constant.FieldConstant;
import com.epmet.commons.tools.constant.NumConstant;
@@ -59,6 +60,7 @@ import com.epmet.feign.EpmetUserFeignClient;
import com.epmet.feign.EpmetUserOpenFeignClient;
import com.epmet.feign.GovOrgFeignClient;
import com.epmet.redis.ProjectProcessRedis;
+import com.epmet.send.SendMqMsgUtil;
import com.epmet.service.ProjectProcessService;
import com.epmet.service.ProjectService;
import com.epmet.service.ProjectStaffService;
@@ -386,6 +388,13 @@ public class ProjectProcessServiceImpl extends BaseServiceImpl