diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index a1c61abf6b..fb5ae07412 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -60,4 +60,9 @@ public interface ConsomerGroupConstants { */ String OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP = "open_data_patrol_change_event_listener_group"; + /** + * 开放的对接数据(中间库) 项目变更事件监听器分组 + */ + String OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP = "open_data_project_change_event_listener_group"; + } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index 4dfeaa3500..3f0c3066ec 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -43,4 +43,9 @@ public interface TopicConstants { * 巡查记录 */ String PATROL = "patrol"; + + /** + * 项目 + */ + String PROJECT = "project"; } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/DisputeProcessMQMsg.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/DisputeProcessMQMsg.java new file mode 100644 index 0000000000..30a1700054 --- /dev/null +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/DisputeProcessMQMsg.java @@ -0,0 +1,22 @@ +package com.epmet.commons.rocketmq.messages; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +/** + * @Description + * @Author zhaoqifeng + * @Date 2021/10/18 16:24 + */ +@Data +@AllArgsConstructor +public class DisputeProcessMQMsg implements Serializable { + private String customerId; + private String projectId; + /** + * 操作类型【新增:add 修改删除:edit】 + */ + private String type; +} diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java index dbbbe4bbf0..c8f9c78e71 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java @@ -1,5 +1,7 @@ package com.epmet.dto.basereport.form; +import com.epmet.commons.tools.dto.form.PageFormDTO; +import lombok.AllArgsConstructor; import lombok.Data; import java.io.Serializable; @@ -10,7 +12,8 @@ import java.io.Serializable; * @Date 2021/10/15 10:55 */ @Data -public class EventInfoFormDTO implements Serializable { +@AllArgsConstructor +public class EventInfoFormDTO extends PageFormDTO implements Serializable { private static final long serialVersionUID = 8479649048108914555L; private String customerId; private String projectId; diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java index 180ac24fe5..d94c781d71 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java @@ -197,4 +197,8 @@ public class ScreenProjectDataDTO implements Serializable { */ private String tempGridName; + private String finishOrg; + + private String finishOrgLevel; + } diff --git a/epmet-module/data-statistical/data-statistical-server/pom.xml b/epmet-module/data-statistical/data-statistical-server/pom.xml index 4e19d8988d..cf2f0f200b 100644 --- a/epmet-module/data-statistical/data-statistical-server/pom.xml +++ b/epmet-module/data-statistical/data-statistical-server/pom.xml @@ -122,6 +122,12 @@ 2.0.0 compile + + com.epmet + epmet-message-client + 2.0.0 + compile + diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index 1792654765..72e12ff5dc 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -2,6 +2,7 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; @@ -9,9 +10,11 @@ import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.constant.SystemMessageType; import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService; +import com.epmet.service.evaluationindex.screen.ScreenProjectDataService; import com.epmet.util.DimIdGenerator; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -29,7 +32,6 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * @Description 项目变动-监听器 @@ -45,6 +47,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -149,6 +152,16 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId); } logger.info("consumer projectChanged msg success,{}",aBoolean); + + //发送项目数据上报的mq消息 + String type; + if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation())) { + type = SystemMessageType.PROJECT_ADD; + } else { + type = SystemMessageType.PROJECT_EDIT; + } + DisputeProcessMQMsg msg = new DisputeProcessMQMsg(customerId, msgObj.getProjectId(), type); + SpringContextUtils.getBean(ScreenProjectDataService.class).sendProjectChangeMq(msg); } catch (RenException e) { // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 logger.error("【RocketMQ】消费项目变动消息失败:",e); diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/ScreenProjectDataService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/ScreenProjectDataService.java index 2cf68852db..a29640a75b 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/ScreenProjectDataService.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/ScreenProjectDataService.java @@ -18,6 +18,7 @@ package com.epmet.service.evaluationindex.screen; import com.epmet.commons.mybatis.service.BaseService; +import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg; import com.epmet.commons.tools.page.PageData; import com.epmet.dto.screen.ScreenProjectDataDTO; import com.epmet.dto.screen.form.ScreenProjectDataInfoFormDTO; @@ -136,6 +137,15 @@ public interface ScreenProjectDataService extends BaseService getProjectList(String customerId, String projectId); + List getProjectList(String customerId, String projectId, Integer pageNo, Integer pageSize); + + /** + * 项目变更MQ + * @Param msg + * @Return + * @Author zhaoqifeng + * @Date 2021/10/18 15:55 + */ + void sendProjectChangeMq(DisputeProcessMQMsg msg); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java index 10678a2fbc..3d9d3db088 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java @@ -22,6 +22,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.epmet.commons.dynamic.datasource.annotation.DataSource; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg; import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.page.PageData; @@ -34,7 +35,10 @@ import com.epmet.dto.screen.form.ScreenProjectDataInfoFormDTO; import com.epmet.dto.screencoll.ScreenCollFormDTO; import com.epmet.entity.evaluationindex.screen.ScreenProjectDataEntity; import com.epmet.entity.evaluationindex.screen.ScreenProjectImgDataEntity; +import com.epmet.feign.EpmetMessageOpenFeignClient; +import com.epmet.send.SendMqMsgUtil; import com.epmet.service.evaluationindex.screen.ScreenProjectDataService; +import com.github.pagehelper.PageHelper; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -62,6 +66,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl page(Map params) { @@ -284,7 +290,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl getProjectList(String customerId, String projectId) { + public List getProjectList(String customerId, String projectId, Integer pageNo, Integer pageSize) { + PageHelper.startPage(pageNo, pageSize); LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(StringUtils.isNotBlank(customerId), ScreenProjectDataEntity::getCustomerId, customerId); wrapper.eq(StringUtils.isNotBlank(projectId), ScreenProjectDataEntity::getProjectId, projectId); @@ -294,4 +301,16 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl getEventInfo(EventInfoFormDTO formDTO) { List list; //根据入参,获取项目 - List projectList = screenProjectDataService.getProjectList(formDTO.getCustomerId(), formDTO.getProjectId()); + List projectList = screenProjectDataService.getProjectList(formDTO.getCustomerId(), formDTO.getProjectId(), formDTO.getPageNo(), formDTO.getPageSize()); //项目列表为空,返回空数组 if(CollectionUtils.isEmpty(projectList)) { return Collections.emptyList(); @@ -206,6 +206,7 @@ public class DataReportingServiceImpl implements DataReportingService { categoryCode = null; } } + dto.setEventCategory(categoryCode); dto.setReportTime(project.getProjectCreateTime()); dto.setHappenDate(DateUtils.parseDate(DateUtils.format(project.getProjectCreateTime()), DateUtils.DATE_PATTERN)); dto.setEventDescription(project.getProjectContent()); diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java index 9e502bd6ca..84580a3b4a 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java @@ -75,4 +75,14 @@ public interface SystemMessageType { */ String USER_PATROL_STOP = "user_patrol_stop"; + /** + * 项目变动 + */ + String PROJECT_ADD = "project_add"; + + /** + * 项目变动 + */ + String PROJECT_EDIT = "project_edit"; + } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java index 551bb9d4ec..e232f5f635 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java @@ -182,4 +182,35 @@ public class SendMqMsgUtil { } + /** + * desc: 发送项目变动事件消息 + * + * @param msgContent + * @return boolean + * @author LiuJanJun + * @date 2021/4/23 3:01 下午 + * @remark 失败重试1次,调用端自行判断如果失败是否要继续执行 + */ + public boolean sendProjectMqMsg(DisputeProcessMQMsg msgContent) { + try { + SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO(); + systemMsgFormDTO.setMessageType(msgContent.getType()); + systemMsgFormDTO.setContent(msgContent); + Result sendMsgResult; + log.info("sendProjectMqMsg param:{}",msgContent); + int retryTime = 0; + do { + sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO); + } while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO); + + if (sendMsgResult != null && sendMsgResult.success()) { + return true; + } + log.error("发送(项目变动)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(systemMsgFormDTO)); + } catch (Exception e) { + log.error("sendMqMsg exception", e); + } + return false; + } + } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index dfd6737b57..0196a0aea7 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -183,6 +183,10 @@ public class SystemMessageServiceImpl implements SystemMessageService { case SystemMessageType.USER_PATROL_STOP: topic = TopicConstants.PATROL; break; + case SystemMessageType.PROJECT_ADD: + case SystemMessageType.PROJECT_EDIT: + topic = TopicConstants.PROJECT; + break; } return topic; } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java index 2ffed8c3dd..3fa339bc0b 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -43,9 +43,9 @@ public class RocketMQConsumerRegister extends MQAbstractRegister { "*", new OpenDataPatrolChangeEventListener()); register(consumerProperties, - ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, + ConsomerGroupConstants.OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP, MessageModel.CLUSTERING, - TopicConstants.PROJECT_CHANGED, + TopicConstants.PROJECT, "*", new OpenDataProjectChangeEventListener()); diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java index e82e2cd8c9..c2af1cffe7 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java @@ -2,11 +2,13 @@ package com.epmet.opendata.mq.listener; import com.alibaba.fastjson.JSON; import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.ConvertUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.basereport.form.EventInfoFormDTO; import com.epmet.feign.EpmetMessageOpenFeignClient; @@ -63,7 +65,8 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur //messageExt.propert logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); - EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class); + DisputeProcessMQMsg obj = JSON.parseObject(msg, DisputeProcessMQMsg.class); + EventInfoFormDTO formDTO = ConvertUtils.sourceToTarget(obj, EventInfoFormDTO.class); DistributedLock distributedLock = null; RLock lock = null; @@ -71,7 +74,7 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur distributedLock = SpringContextUtils.getBean(DistributedLock.class); lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()), 30L, 30L, TimeUnit.SECONDS); - SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj); + SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(formDTO); } catch (RenException e) { // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java index 0a0d18a9f0..d716762780 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java @@ -18,9 +18,11 @@ package com.epmet.opendata.service.impl; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.ConvertUtils; import com.epmet.commons.tools.utils.Result; +import com.epmet.constant.SystemMessageType; import com.epmet.dto.basereport.form.EventInfoFormDTO; import com.epmet.feign.DataStatisticalOpenFeignClient; import com.epmet.opendata.dao.BaseDisputeProcessDao; @@ -62,12 +64,27 @@ public class BaseDisputeProcessServiceImpl extends BaseServiceImpl list = result.getData(); if (CollectionUtils.isNotEmpty(list)) { List entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class); - if("add".equals(formDTO.getType())){ + if(SystemMessageType.PROJECT_ADD.equals(formDTO.getType())){ insertBatch(entityList); }else { updateBatchById(entityList); } } + //分批次循环 + while (CollectionUtils.isNotEmpty(list)) { + formDTO.setPageNo(formDTO.getPageNo() + NumConstant.ONE); + result = dataStatisticalOpenFeignClient.getEventInfo(formDTO); + list = result.getData(); + if (CollectionUtils.isNotEmpty(list)) { + List entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class); + if("add".equals(formDTO.getType())){ + insertBatch(entityList); + }else { + updateBatchById(entityList); + } + } + } + }