diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index a09e63861c..c4c0138ad0 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -10,6 +10,7 @@ import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.constant.PingYinConstant; import com.epmet.constant.SystemMessageType; import com.epmet.dto.extract.form.ExtractOriginFormDTO; import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; @@ -66,7 +67,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } catch (Exception e) { //失败不重发 logger.error("consumeMessage fail,msg:{}",e.getMessage()); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + return ConsumeConcurrentlyStatus.RECONSUME_LATER; } log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; @@ -94,7 +95,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); - if (org.apache.commons.lang.StringUtils.isNotBlank(pendingMsgLabel)) { + if (StringUtils.isNotBlank(pendingMsgLabel)) { try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { @@ -109,19 +110,14 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently try { String customerId = msgObj.getCustomerId(); distributedLock = SpringContextUtils.getBean(DistributedLock.class); - lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId) + lock = distributedLock.getLock(String.format("lock:project_changed:%s:%s", customerId, msgObj.getProjectId()) ,30L, 30L, TimeUnit.SECONDS); if (StringUtils.isBlank(customerId)){ logger.error("consumer project_changed fail,msg:{}",customerId); return; } - //消息被消费太快 业务数据还没有完成 歇一会先 - try { - Thread.sleep(60L); - } catch (InterruptedException e) { - logger.error("consumeMessage sleep exception",e); - } + ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO(); extractOriginFormDTO.setCustomerId(customerId); @@ -146,21 +142,8 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } logger.info("consumer projectChanged msg success,{}",aBoolean); - //发送项目数据上报的mq消息 - if ("6f203e30de1a65aab7e69c058826cd80".equals(customerId)) { - if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation()) || "close".equals(msgObj.getOperation())) { - String type; - if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation())) { - type = SystemMessageType.PROJECT_ADD; - } else { - type = SystemMessageType.PROJECT_EDIT; - } - List projectList = new ArrayList<>(); - projectList.add(msgObj.getProjectId()); - DisputeProcessMQMsg msg = new DisputeProcessMQMsg(customerId, projectList, type); - SpringContextUtils.getBean(ScreenProjectDataService.class).sendProjectChangeMq(msg); - } - } + sendProject2OpenData(msgObj, customerId); + } catch (RenException e) { // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 logger.error("【RocketMQ】消费项目变动消息失败:",e); @@ -175,6 +158,29 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } } + /** + * desc:发型项目数据到 opendata + * @param msgObj + * @param customerId + */ + private void sendProject2OpenData(ProjectChangedMQMsg msgObj, String customerId) { + //发送项目数据上报的mq消息 + if (PingYinConstant.PROD_PING_YIN_CUSTOMER_ID.equals(customerId)) { + if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation()) || "close".equals(msgObj.getOperation())) { + String type; + if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation())) { + type = SystemMessageType.PROJECT_ADD; + } else { + type = SystemMessageType.PROJECT_EDIT; + } + List projectList = new ArrayList<>(); + projectList.add(msgObj.getProjectId()); + DisputeProcessMQMsg msg = new DisputeProcessMQMsg(customerId, projectList, type); + SpringContextUtils.getBean(ScreenProjectDataService.class).sendProjectChangeMq(msg); + } + } + } + @PreDestroy public void saveCalStatus() { //todo diff --git a/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectTraceServiceImpl.java b/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectTraceServiceImpl.java index 2a56eb3dfc..5bef06a0a2 100644 --- a/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectTraceServiceImpl.java +++ b/epmet-module/gov-project/gov-project-server/src/main/java/com/epmet/service/impl/ProjectTraceServiceImpl.java @@ -312,7 +312,10 @@ public class ProjectTraceServiceImpl implements ProjectTraceS List staffList = formDTO.getStaffList(); //1.文字内容安全校验 List list = new ArrayList<>(); - list.add(formDTO.getTitle());list.add(formDTO.getBackGround());list.add(formDTO.getPublicReply());list.add(formDTO.getInternalRemark()); + list.add(formDTO.getTitle()); + list.add(formDTO.getBackGround()); + list.add(formDTO.getPublicReply()); + list.add(formDTO.getInternalRemark()); safetyCheck(list); //2.数据准备,查询需要用到的数据