Browse Source

事件上报发送MQ消息

master
zhaoqifeng 4 years ago
parent
commit
eb0a132da2
  1. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 22
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/DisputeProcessMQMsg.java
  4. 5
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java
  5. 4
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java
  6. 6
      epmet-module/data-statistical/data-statistical-server/pom.xml
  7. 15
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
  8. 12
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/ScreenProjectDataService.java
  9. 21
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java
  10. 3
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java
  11. 10
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
  12. 31
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java
  13. 4
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  14. 4
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
  15. 7
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java
  16. 19
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -60,4 +60,9 @@ public interface ConsomerGroupConstants {
*/
String OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP = "open_data_patrol_change_event_listener_group";
/**
* 开放的对接数据(中间库) 项目变更事件监听器分组
*/
String OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP = "open_data_project_change_event_listener_group";
}

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -43,4 +43,9 @@ public interface TopicConstants {
* 巡查记录
*/
String PATROL = "patrol";
/**
* 项目
*/
String PROJECT = "project";
}

22
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/DisputeProcessMQMsg.java

@ -0,0 +1,22 @@
package com.epmet.commons.rocketmq.messages;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
/**
* @Description
* @Author zhaoqifeng
* @Date 2021/10/18 16:24
*/
@Data
@AllArgsConstructor
public class DisputeProcessMQMsg implements Serializable {
private String customerId;
private String projectId;
/**
* 操作类型新增:add 修改删除:edit
*/
private String type;
}

5
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java

@ -1,5 +1,7 @@
package com.epmet.dto.basereport.form;
import com.epmet.commons.tools.dto.form.PageFormDTO;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@ -10,7 +12,8 @@ import java.io.Serializable;
* @Date 2021/10/15 10:55
*/
@Data
public class EventInfoFormDTO implements Serializable {
@AllArgsConstructor
public class EventInfoFormDTO extends PageFormDTO implements Serializable {
private static final long serialVersionUID = 8479649048108914555L;
private String customerId;
private String projectId;

4
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java

@ -197,4 +197,8 @@ public class ScreenProjectDataDTO implements Serializable {
*/
private String tempGridName;
private String finishOrg;
private String finishOrgLevel;
}

6
epmet-module/data-statistical/data-statistical-server/pom.xml

@ -122,6 +122,12 @@
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-message-client</artifactId>
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

15
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -2,6 +2,7 @@ package com.epmet.mq;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg;
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
@ -9,9 +10,11 @@ import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.SystemMessageType;
import com.epmet.dto.extract.form.ExtractOriginFormDTO;
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService;
import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService;
import com.epmet.service.evaluationindex.screen.ScreenProjectDataService;
import com.epmet.util.DimIdGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -29,7 +32,6 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @Description 项目变动-监听器
@ -45,6 +47,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -149,6 +152,16 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId);
}
logger.info("consumer projectChanged msg success,{}",aBoolean);
//发送项目数据上报的mq消息
String type;
if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation())) {
type = SystemMessageType.PROJECT_ADD;
} else {
type = SystemMessageType.PROJECT_EDIT;
}
DisputeProcessMQMsg msg = new DisputeProcessMQMsg(customerId, msgObj.getProjectId(), type);
SpringContextUtils.getBean(ScreenProjectDataService.class).sendProjectChangeMq(msg);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【RocketMQ】消费项目变动消息失败:",e);

12
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/ScreenProjectDataService.java

@ -18,6 +18,7 @@
package com.epmet.service.evaluationindex.screen;
import com.epmet.commons.mybatis.service.BaseService;
import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg;
import com.epmet.commons.tools.page.PageData;
import com.epmet.dto.screen.ScreenProjectDataDTO;
import com.epmet.dto.screen.form.ScreenProjectDataInfoFormDTO;
@ -136,6 +137,15 @@ public interface ScreenProjectDataService extends BaseService<ScreenProjectDataE
* @Author zhaoqifeng
* @Date 2021/10/15 14:22
*/
List<ScreenProjectDataDTO> getProjectList(String customerId, String projectId);
List<ScreenProjectDataDTO> getProjectList(String customerId, String projectId, Integer pageNo, Integer pageSize);
/**
* 项目变更MQ
* @Param msg
* @Return
* @Author zhaoqifeng
* @Date 2021/10/18 15:55
*/
void sendProjectChangeMq(DisputeProcessMQMsg msg);
}

21
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java

@ -22,6 +22,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.epmet.commons.dynamic.datasource.annotation.DataSource;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg;
import com.epmet.commons.tools.constant.FieldConstant;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.page.PageData;
@ -34,7 +35,10 @@ import com.epmet.dto.screen.form.ScreenProjectDataInfoFormDTO;
import com.epmet.dto.screencoll.ScreenCollFormDTO;
import com.epmet.entity.evaluationindex.screen.ScreenProjectDataEntity;
import com.epmet.entity.evaluationindex.screen.ScreenProjectImgDataEntity;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import com.epmet.send.SendMqMsgUtil;
import com.epmet.service.evaluationindex.screen.ScreenProjectDataService;
import com.github.pagehelper.PageHelper;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@ -62,6 +66,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl<ScreenProjectD
@Resource
private ScreenProjectImgDataDao screenProjectImgDataDao;
@Resource
private EpmetMessageOpenFeignClient epmetMessageOpenFeignClient;
@Override
public PageData<ScreenProjectDataDTO> page(Map<String, Object> params) {
@ -284,7 +290,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl<ScreenProjectD
* @Date 2021/10/15 14:22
*/
@Override
public List<ScreenProjectDataDTO> getProjectList(String customerId, String projectId) {
public List<ScreenProjectDataDTO> getProjectList(String customerId, String projectId, Integer pageNo, Integer pageSize) {
PageHelper.startPage(pageNo, pageSize);
LambdaQueryWrapper<ScreenProjectDataEntity> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(StringUtils.isNotBlank(customerId), ScreenProjectDataEntity::getCustomerId, customerId);
wrapper.eq(StringUtils.isNotBlank(projectId), ScreenProjectDataEntity::getProjectId, projectId);
@ -294,4 +301,16 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl<ScreenProjectD
return ConvertUtils.sourceToTarget(list, ScreenProjectDataDTO.class);
}
/**
* @Description 项目变更MQ
* @Param msg
* @Return
* @Author zhaoqifeng
* @Date 2021/10/18 14:00
*/
@Override
public void sendProjectChangeMq(DisputeProcessMQMsg msg) {
SendMqMsgUtil.build().openFeignClient(epmetMessageOpenFeignClient).sendProjectMqMsg(msg);
}
}

3
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java

@ -148,7 +148,7 @@ public class DataReportingServiceImpl implements DataReportingService {
public List<BaseDisputeProcessDTO> getEventInfo(EventInfoFormDTO formDTO) {
List<BaseDisputeProcessDTO> list;
//根据入参,获取项目
List<ScreenProjectDataDTO> projectList = screenProjectDataService.getProjectList(formDTO.getCustomerId(), formDTO.getProjectId());
List<ScreenProjectDataDTO> projectList = screenProjectDataService.getProjectList(formDTO.getCustomerId(), formDTO.getProjectId(), formDTO.getPageNo(), formDTO.getPageSize());
//项目列表为空,返回空数组
if(CollectionUtils.isEmpty(projectList)) {
return Collections.emptyList();
@ -206,6 +206,7 @@ public class DataReportingServiceImpl implements DataReportingService {
categoryCode = null;
}
}
dto.setEventCategory(categoryCode);
dto.setReportTime(project.getProjectCreateTime());
dto.setHappenDate(DateUtils.parseDate(DateUtils.format(project.getProjectCreateTime()), DateUtils.DATE_PATTERN));
dto.setEventDescription(project.getProjectContent());

10
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java

@ -75,4 +75,14 @@ public interface SystemMessageType {
*/
String USER_PATROL_STOP = "user_patrol_stop";
/**
* 项目变动
*/
String PROJECT_ADD = "project_add";
/**
* 项目变动
*/
String PROJECT_EDIT = "project_edit";
}

31
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java

@ -182,4 +182,35 @@ public class SendMqMsgUtil {
}
/**
* desc: 发送项目变动事件消息
*
* @param msgContent
* @return boolean
* @author LiuJanJun
* @date 2021/4/23 3:01 下午
* @remark 失败重试1次,调用端自行判断如果失败是否要继续执行
*/
public boolean sendProjectMqMsg(DisputeProcessMQMsg msgContent) {
try {
SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO();
systemMsgFormDTO.setMessageType(msgContent.getType());
systemMsgFormDTO.setContent(msgContent);
Result sendMsgResult;
log.info("sendProjectMqMsg param:{}",msgContent);
int retryTime = 0;
do {
sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO);
} while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO);
if (sendMsgResult != null && sendMsgResult.success()) {
return true;
}
log.error("发送(项目变动)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(systemMsgFormDTO));
} catch (Exception e) {
log.error("sendMqMsg exception", e);
}
return false;
}
}

4
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -183,6 +183,10 @@ public class SystemMessageServiceImpl implements SystemMessageService {
case SystemMessageType.USER_PATROL_STOP:
topic = TopicConstants.PATROL;
break;
case SystemMessageType.PROJECT_ADD:
case SystemMessageType.PROJECT_EDIT:
topic = TopicConstants.PROJECT;
break;
}
return topic;
}

4
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java

@ -43,9 +43,9 @@ public class RocketMQConsumerRegister extends MQAbstractRegister {
"*",
new OpenDataPatrolChangeEventListener());
register(consumerProperties,
ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP,
ConsomerGroupConstants.OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.PROJECT_CHANGED,
TopicConstants.PROJECT,
"*",
new OpenDataProjectChangeEventListener());

7
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java

@ -2,11 +2,13 @@ package com.epmet.opendata.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.DisputeProcessMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.basereport.form.EventInfoFormDTO;
import com.epmet.feign.EpmetMessageOpenFeignClient;
@ -63,7 +65,8 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur
//messageExt.propert
logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel);
EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class);
DisputeProcessMQMsg obj = JSON.parseObject(msg, DisputeProcessMQMsg.class);
EventInfoFormDTO formDTO = ConvertUtils.sourceToTarget(obj, EventInfoFormDTO.class);
DistributedLock distributedLock = null;
RLock lock = null;
@ -71,7 +74,7 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()),
30L, 30L, TimeUnit.SECONDS);
SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj);
SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(formDTO);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e)));

19
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java

@ -18,9 +18,11 @@
package com.epmet.opendata.service.impl;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.Result;
import com.epmet.constant.SystemMessageType;
import com.epmet.dto.basereport.form.EventInfoFormDTO;
import com.epmet.feign.DataStatisticalOpenFeignClient;
import com.epmet.opendata.dao.BaseDisputeProcessDao;
@ -62,12 +64,27 @@ public class BaseDisputeProcessServiceImpl extends BaseServiceImpl<BaseDisputePr
List<BaseDisputeProcessDTO> list = result.getData();
if (CollectionUtils.isNotEmpty(list)) {
List<BaseDisputeProcessEntity> entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class);
if("add".equals(formDTO.getType())){
if(SystemMessageType.PROJECT_ADD.equals(formDTO.getType())){
insertBatch(entityList);
}else {
updateBatchById(entityList);
}
}
//分批次循环
while (CollectionUtils.isNotEmpty(list)) {
formDTO.setPageNo(formDTO.getPageNo() + NumConstant.ONE);
result = dataStatisticalOpenFeignClient.getEventInfo(formDTO);
list = result.getData();
if (CollectionUtils.isNotEmpty(list)) {
List<BaseDisputeProcessEntity> entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class);
if("add".equals(formDTO.getType())){
insertBatch(entityList);
}else {
updateBatchById(entityList);
}
}
}
}

Loading…
Cancel
Save