Browse Source

Merge branches 'dev_grid_platform' and 'develop' of http://git.elinkit.com.cn:7070/r/epmet-cloud into develop

 Conflicts:
	epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java
dev_shibei_match
wxz 4 years ago
parent
commit
5d57db69eb
  1. 8
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java
  2. 8
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java
  3. 8
      epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java
  4. 4
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java
  5. 28
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/OrgOrStaffMQMsg.java
  6. 5
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java
  7. 8
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  8. 4
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java
  9. 1
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java
  10. 3
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/form/ScreenProjectDataInfoFormDTO.java
  11. 9
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java
  12. 6
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
  13. 27
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java
  14. 1
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java
  15. 24
      epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml
  16. 32
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java
  17. 24
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
  18. 35
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java
  19. 61
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java
  20. 16
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
  21. 97
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  22. 26
      epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml
  23. 8
      epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java
  24. 8
      epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java
  25. 56
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java
  26. 7
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
  27. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
  28. 7
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
  29. 108
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java
  30. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
  31. 69
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java
  32. 87
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java
  33. 2
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java
  34. 8
      epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java

8
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java

@ -58,7 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) {
String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("认证操作日志监听器-收到消息内容:{}", msg);
LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class);
@ -101,7 +101,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -115,8 +115,8 @@ public class AuthOperationLogListener implements MessageListenerConcurrently {
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel);
//logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel);
}
}

8
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java

@ -58,7 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
private void consumeMessage(MessageExt messageExt) {
String opeType = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("积分操作日志监听器-收到消息内容:{}", msg);
PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class);
@ -103,7 +103,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【积分操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -117,8 +117,8 @@ public class PointOperationLogListener implements MessageListenerConcurrently {
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
//logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
}
}

8
epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java

@ -58,7 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) {
//String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("项目变动操作日志监听器-收到消息内容:{}", msg);
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@ -103,7 +103,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【项目操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -136,8 +136,8 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
//logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel);
}
}

4
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java

@ -7,7 +7,7 @@ package com.epmet.commons.rocketmq.constants;
*/
public interface MQUserPropertys {
//堆积消息label
String PENDING_MSG_LABEL = "pendingMsgLabel";
//阻塞消息label
String BLOCKED_MSG_LABEL = "blockedMsgLabel";
}

28
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/OrgOrStaffMQMsg.java

@ -0,0 +1,28 @@
package com.epmet.commons.rocketmq.messages;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* 组织网格人员中间库数据上报MQ
* @author sun
*/
@Data
@AllArgsConstructor
public class OrgOrStaffMQMsg implements Serializable {
//客户Id
private String customerId;
//组织、网格、人员Id
private String orgId;
//数据类型【组织:agency 网格:grid 人员:staff】
private String orgType;
//操作类型【新增:add 修改删除:edit】
private String type;
}

5
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java

@ -30,6 +30,11 @@ public class PageFormDTO {
*/
private Integer offset;
/**
* 是否分页 默认分页
*/
private boolean isPage = true;
public Integer getOffset() {
return (pageNo-1)*pageSize;
}

8
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -550,14 +550,14 @@ public class RedisKeys {
}
/**
* @description 检查message MQ滞留消息
* @description 检查message MQ阻塞消息
*
* @param pendingMsgLabel 滞留消息的label
* @param blockedMsgLabel 滞留消息的label
* @return
* @author wxz
* @date 2021.10.14 14:33:53
*/
public static String pendingMqMsgKey(String pendingMsgLabel) {
return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel);
public static String blockedMqMsgKey(String blockedMsgLabel) {
return rootPrefix.concat("message:mq:blocked:").concat(blockedMsgLabel);
}
}

4
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java

@ -14,4 +14,8 @@ public class EventInfoFormDTO implements Serializable {
private static final long serialVersionUID = 8479649048108914555L;
private String customerId;
private String projectId;
/**
* 操作类型新增:add 修改删除:edit
*/
private String type;
}

1
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java

@ -196,4 +196,5 @@ public class ScreenProjectDataDTO implements Serializable {
* fact_origin_project_main_daily.grid_id对应的网格名称
*/
private String tempGridName;
}

3
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/form/ScreenProjectDataInfoFormDTO.java

@ -115,7 +115,8 @@ public class ScreenProjectDataInfoFormDTO implements Serializable {
/**
* 结案日期
*/
private String closeCaseTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date closeCaseTime;
/**
* 所有上级ID用英文逗号分开

9
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java

@ -160,13 +160,4 @@ public class ScreenProjectDataEntity extends BaseEpmetEntity {
*/
private BigDecimal satisfactionScore;
/**
* 来源议题issue 项目立项:agency 事件:resi_event
*/
private String origin;
/**
* 分类编码存在多个以英文逗号隔开注意与all_cagtegory_name顺序一致;
*/
private String categoryCode;
}

6
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -70,7 +70,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt msgExt) {
String msg = new String(msgExt.getBody());
String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("receive customerId:{}", JSON.toJSONString(msg));
ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class);
@ -179,9 +179,9 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【项目变动事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
//logger.info("【项目变动事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
}
/*@Override

27
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java

@ -44,7 +44,6 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
@ -136,22 +135,18 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl<ScreenProjectD
//todo 动态获取超时时间
//如果结案 结案时间立项时间;
if ("closed_case".equals(projectStatusCode)) {
try {
Date createTime = item.getProjectCreateTime();
Date closeCaseTime = sdf.parse(item.getCloseCaseTime());
Date createTime = item.getProjectCreateTime();
Date closeCaseTime = item.getCloseCaseTime();
if (closeCaseTime.getTime() - createTime.getTime() <= 1000 * 60 * 60 * 24 * 4) {
//如果结案时间-立项时间小于等于4天 未超期 level3
item.setProjectLevel(3);
} else if (closeCaseTime.getTime() - createTime.getTime() <= 1000 * 60 * 60 * 24 * 5) {
//如果结案时间-立项时间大于4天小于5天 即将超期 level2
item.setProjectLevel(2);
} else {
//大于5天 已超期 level1
item.setProjectLevel(1);
}
} catch (ParseException e) {
e.printStackTrace();
if (closeCaseTime.getTime() - createTime.getTime() <= 1000 * 60 * 60 * 24 * 4) {
//如果结案时间-立项时间小于等于4天 未超期 level3
item.setProjectLevel(3);
} else if (closeCaseTime.getTime() - createTime.getTime() <= 1000 * 60 * 60 * 24 * 5) {
//如果结案时间-立项时间大于4天小于5天 即将超期 level2
item.setProjectLevel(2);
} else {
//大于5天 已超期 level1
item.setProjectLevel(1);
}
} else if ("pending".equals(projectStatusCode)) {
//如果处理中 当前时间-立项时间;

1
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java

@ -186,6 +186,7 @@ public class DataReportingServiceImpl implements DataReportingService {
private BaseDisputeProcessDTO getBaseDisputeProcessDTO(ScreenProjectDataDTO project) {
BaseDisputeProcessDTO dto = new BaseDisputeProcessDTO();
dto.setId(project.getProjectId());
dto.setCustomerId(project.getCustomerId());
dto.setEventName(project.getProjectTitle());
String categoryCode = project.getCategoryCode().split(StrConstant.COMMA)[0];

24
epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml

@ -1011,12 +1011,15 @@
FROM staff_patrol_record
WHERE
1=1
<if test="patrolId != null and patrolId != ''">
AND ID = #{patrolId}
</if>
<if test="patrolId != null and patrolId != ''">
AND ID = #{patrolId}
</if>
and CUSTOMER_ID = #{customerId}
AND DEL_FLAG = '0'
LIMIT #{offset},#{pageSize}
<if test="isPage != null and isPage">
LIMIT #{offset},#{pageSize}
</if>
order by created_time
</select>
<select id="getPatrolDetailList" resultType="com.epmet.dto.user.result.MidPatrolDetailResult">
select
@ -1024,12 +1027,17 @@
from staff_patrol_detail
WHERE
1=1
<if test="patrolId != null and patrolId != ''">
AND staff_patrol_rec_id = #{patrolId}
</if>
<if test="patrolId != null and patrolId != ''">
AND staff_patrol_rec_id = #{patrolId}
</if>
and CUSTOMER_ID = #{customerId}
AND DEL_FLAG = '0'
LIMIT #{offset},#{pageSize}
<if test="isPage != null and isPage">
LIMIT #{offset},#{pageSize}
</if>
order by created_time
</select>
</mapper>

32
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java

@ -1,10 +1,7 @@
package com.epmet.send;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg;
import com.epmet.commons.rocketmq.messages.PointRuleChangedMQMsg;
import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.rocketmq.messages.StaffPatrolMQMsg;
import com.epmet.commons.rocketmq.messages.*;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.utils.Result;
import com.epmet.constant.SystemMessageType;
@ -158,4 +155,31 @@ public class SendMqMsgUtil {
}
/**
* @Description 组织网格人员中间库信息同步Mq
* @author sun
*/
public boolean sendOrgStaffMqMsg(OrgOrStaffMQMsg msg) {
try {
SystemMsgFormDTO msgForm = new SystemMsgFormDTO();
msgForm.setMessageType(msg.getType());
msgForm.setContent(msg);
Result sendMsgResult;
log.info("sendOrgStaffMqMsg param:{}",msgForm);
int retryTime = 0;
do {
sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(msgForm);
} while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO);
if (sendMsgResult != null && sendMsgResult.success()) {
return true;
}
log.error("发送(组织、网格、人员同步中间库)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(msgForm));
} catch (Exception e) {
log.error("sendOrgStaffMqMsg exception", e);
}
return false;
}
}

24
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java

@ -28,10 +28,32 @@ public class SystemMessageController {
@PostMapping("send-by-mq")
public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) {
ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class);
systemMessageService.sendMQMessage(form.getMessageType(), form.getContent());
systemMessageService.persistAndSendMQMessage(form.getMessageType(), form.getContent());
return new Result();
}
/**
* @description 检查MQ阻塞消息(MQ收到消息但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息)
*
* @param
* @return
* @author wxz
* @date 2021.10.16 22:07:38
*/
@PostMapping("blocked-mq-msg-scan")
public Result blockedMqMsgScan() {
systemMessageService.blockedMqMsgScan();
return new Result();
}
/**
* @description 检查MQ滞留消息(往MQ发送消息失败MQ未收到消息)
*
* @param
* @return
* @author wxz
* @date 2021.10.16 22:08:32
*/
@PostMapping("pendding-mq-msg-scan")
public Result penddingMqMsgScan() {
systemMessageService.penddingMqMsgScan();

35
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java

@ -0,0 +1,35 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.dao;
import com.epmet.commons.mybatis.dao.BaseDao;
import com.epmet.entity.SystemMessagePenddingEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* 系统消息表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-16
*/
@Mapper
public interface SystemMessagePenddingDao extends BaseDao<SystemMessagePenddingEntity> {
void physicalDeleteById(@Param("penddingId") String penddingId);
}

61
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java

@ -0,0 +1,61 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.epmet.commons.mybatis.entity.BaseEpmetEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* 系统消息表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-16
*/
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("system_message_pendding")
public class SystemMessagePenddingEntity extends BaseEpmetEntity {
private static final long serialVersionUID = 1L;
/**
* 消息类型init_customer:客户初始化,login登录logout退出
*/
private String msgType;
/**
* 消息主表id
*/
private String msgId;
/**
* 消息发送途径
*/
private String sendApproach;
/**
* 消息内容
*/
private String content;
}

16
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java

@ -3,7 +3,7 @@ package com.epmet.service;
public interface SystemMessageService {
/**
* @description 发送mq消息
* @description 持久化和发送mq消息
*
* @param messageType
* @param content
@ -11,15 +11,25 @@ public interface SystemMessageService {
* @author wxz
* @date 2021.10.14 15:07:02
*/
void sendMQMessage(String messageType, Object content);
void persistAndSendMQMessage(String messageType, Object content);
/**
* @description 扫描滞留消息
* @description 扫描阻塞的消息
*
* @param
* @return
* @author wxz
* @date 2021.10.15 10:13:37
*/
void blockedMqMsgScan();
/**
* @description 扫描待办的消息(因为发送到MQ失败而堆积)
*
* @param
* @return
* @author wxz
* @date 2021.10.16 12:11:39
*/
void penddingMqMsgScan();
}

97
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -12,7 +12,9 @@ import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.SystemMessageSendApproach;
import com.epmet.constant.SystemMessageType;
import com.epmet.dao.SystemMessageDao;
import com.epmet.dao.SystemMessagePenddingDao;
import com.epmet.entity.SystemMessageEntity;
import com.epmet.entity.SystemMessagePenddingEntity;
import com.epmet.service.SystemMessageService;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -27,6 +29,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@ -53,51 +56,90 @@ public class SystemMessageServiceImpl implements SystemMessageService {
@Autowired
private SystemMessageDao systemMessageDao;
@Autowired
private SystemMessagePenddingDao systemMessagePenddingDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RedisUtils redisUtils;
@Transactional(rollbackFor = Exception.class)
@Override
public void sendMQMessage(String messageType, Object content) {
public void persistAndSendMQMessage(String messageType, Object content) {
String contentStr = JSON.toJSONString(content);
// 1.存储消息到表
SystemMessageEntity systemMessageEntity = new SystemMessageEntity();
systemMessageEntity.setMsgType(messageType);
systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ);
systemMessageEntity.setContent(contentStr);
systemMessageDao.insert(systemMessageEntity);
logger.info("【发送MQ系统消息】-落盘并发送-messageType:{}, contentStr:{}", messageType, contentStr);
// 1.消息落盘
String penddingMsgId = persistMessage(messageType, contentStr);
// 2.发送消息
sendMQMessage(messageType, contentStr, penddingMsgId);
}
/**
* @description 消息落盘有事务
*
* @param messageType
* @param contentStr
* @return
* @author wxz
* @date 2021.10.16 11:04:53
*/
@Transactional(rollbackFor = Exception.class)
public String persistMessage(String messageType, String contentStr) {
SystemMessageEntity message = new SystemMessageEntity();
message.setMsgType(messageType);
message.setSendApproach(SystemMessageSendApproach.MQ);
message.setContent(contentStr);
systemMessageDao.insert(message);
SystemMessagePenddingEntity pendding = new SystemMessagePenddingEntity();
pendding.setMsgType(messageType);
pendding.setSendApproach(SystemMessageSendApproach.MQ);
pendding.setContent(contentStr);
pendding.setMsgId(message.getId());
systemMessagePenddingDao.insert(pendding);
logger.info("【发送MQ系统消息】-落盘完成");
return pendding.getId();
}
private void sendMQMessage(String messageType, String contentStr, String penddingId) {
String topic = getTopicByMsgType(messageType);
// 2.缓存下来,供滞留消息扫描。TTL -1,永不过期
// 缓存下来,供滞留消息扫描。TTL -1,永不过期
String pendingMsgLabel = null;
String pendingMsgKey = null;
try {
MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr);
pendingMsgLabel = UUID.randomUUID().toString().replace("-", "");
pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
pendingMsgKey = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.set(pendingMsgKey, mcb, -1);
//logger.info("【发送MQ系统消息】-存入redis堆积列表成功-{}-{}-{}", topic, messageType, pendingMsgLabel);
} catch (Exception e) {
logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e));
logger.error("【发送MQ系统消息】将系统MQ消息存储到Redis堆积列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e));
}
// 3.发送mq消息
try {
Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel);
meMessage.putUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL, pendingMsgLabel);
rocketMQTemplate.getProducer().send(meMessage);
logger.info("【发送MQ系统消息】-发送到MQ成功");
} catch (Exception e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace);
logger.error("【发送MQ系统消息】发送系统MQ消息失败,堆栈信息:{}", errorStackTrace);
// 清理消息缓存
// 清理阻塞中的消息缓存
redisUtils.delete(pendingMsgKey);
throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode());
}
// 删除消息堆积
systemMessagePenddingDao.physicalDeleteById(penddingId);
}
/**
@ -146,8 +188,8 @@ public class SystemMessageServiceImpl implements SystemMessageService {
}
@Override
public void penddingMqMsgScan() {
String scanKey = RedisKeys.pendingMqMsgKey("*");
public void blockedMqMsgScan() {
String scanKey = RedisKeys.blockedMqMsgKey("*");
Set<String> keys = redisUtils.keys(scanKey);
//System.out.println(keys);
for (String key : keys) {
@ -173,7 +215,7 @@ public class SystemMessageServiceImpl implements SystemMessageService {
&& l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) {
logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size());
logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now;
}
@ -185,7 +227,7 @@ public class SystemMessageServiceImpl implements SystemMessageService {
createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now)
&& l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now))
) {
logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size());
logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size());
l1LastAlertTime = now;
}
break;
@ -193,6 +235,25 @@ public class SystemMessageServiceImpl implements SystemMessageService {
}
}
@Override
public void penddingMqMsgScan() {
Integer count = systemMessagePenddingDao.selectCount(null);
if (count == 0) {
return;
}
// 扫描并且重新投递
List<SystemMessagePenddingEntity> penddingMsgs = systemMessagePenddingDao.selectList(null);
for (SystemMessagePenddingEntity penddingMsg : penddingMsgs) {
try {
sendMQMessage(penddingMsg.getMsgType(), penddingMsg.getContent(), penddingMsg.getId());
} catch (Exception e) {
// 投递失败不应影响后续消息的投递
logger.error("【重新投递MQ消息】失败, msgType:{}, penddingMsgId:{}, 错误:{}", penddingMsg.getMsgType(), penddingMsg.getId() , ExceptionUtils.getErrorStackTrace(e));
}
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor

26
epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.epmet.dao.SystemMessagePenddingDao">
<resultMap type="com.epmet.entity.SystemMessagePenddingEntity" id="systemMessagePenddingMap">
<result property="id" column="ID"/>
<result property="msgId" column="MSG_ID"/>
<result property="msgType" column="MSG_TYPE"/>
<result property="sendApproach" column="SEND_APPROACH"/>
<result property="content" column="CONTENT"/>
<result property="revision" column="REVISION"/>
<result property="createdBy" column="CREATED_BY"/>
<result property="createdTime" column="CREATED_TIME"/>
<result property="updatedBy" column="UPDATED_BY"/>
<result property="updatedTime" column="UPDATED_TIME"/>
<result property="delFlag" column="DEL_FLAG"/>
</resultMap>
<!--物理删除消息堆积-->
<delete id="physicalDeleteById">
delete from system_message_pendding where ID = #{penddingId}
</delete>
</mapper>

8
epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java

@ -57,7 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
public void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -84,7 +84,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【议题/项目分类标签初始化事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -98,8 +98,8 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
//logger.info("【议题/项目分类标签初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
}
}

8
epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java

@ -56,7 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -82,7 +82,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【客户初始化事件监听器】-orgRole-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -129,9 +129,9 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【客户初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
//logger.info("【客户初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
}
/* @Override

56
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java

@ -0,0 +1,56 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.opendata.controller;
import com.epmet.commons.tools.utils.Result;
import com.epmet.dto.basereport.form.EventInfoFormDTO;
import com.epmet.opendata.service.BaseDisputeProcessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 事件信息表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-15
*/
@RestController
@RequestMapping("basedisputeprocess")
public class BaseDisputeProcessController {
@Autowired
private BaseDisputeProcessService baseDisputeProcessService;
/**
* 获取上报事件
* @Param formDTO
* @Return {@link Result}
* @Author zhaoqifeng
* @Date 2021/10/15 16:59
*/
@PostMapping("eventinfo")
public Result getEventinfo(@RequestBody(required = false) EventInfoFormDTO formDTO) {
baseDisputeProcessService.getEventinfo(formDTO);
return new Result();
}
}

7
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java

@ -6,6 +6,7 @@ import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataProjectChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
@ -41,6 +42,12 @@ public class RocketMQConsumerRegister extends MQAbstractRegister {
TopicConstants.PATROL,
"*",
new OpenDataPatrolChangeEventListener());
register(consumerProperties,
ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP,
MessageModel.CLUSTERING,
TopicConstants.PROJECT_CHANGED,
"*",
new OpenDataProjectChangeEventListener());
// ...其他监听器类似
}

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java

@ -55,7 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
GridBaseInfoFormDTO obj = JSON.parseObject(msg, GridBaseInfoFormDTO.class);
@ -87,7 +87,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -101,8 +101,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,pendingMsgLabel:{}", pendingMsgLabel);
//logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
}

7
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java

@ -59,7 +59,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags);
StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class);
@ -105,7 +105,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【开放数据事件监听器】-巡查-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -118,7 +118,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
//logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
}

108
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java

@ -0,0 +1,108 @@
package com.epmet.opendata.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.basereport.form.EventInfoFormDTO;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import com.epmet.opendata.service.BaseDisputeProcessService;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author wxz
* @Description 系统对接中间库项目信息变更监听器
* @date 2021.10.13 15:21:48
*/
public class OpenDataProjectChangeEventListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
private EpmetMessageOpenFeignClient messageOpenFeignClient;
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(MessageExt messageExt) {
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
//messageExt.propert
logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel);
EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class);
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()),
30L, 30L, TimeUnit.SECONDS);
SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-project-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @description 应答mq消息
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
}
}

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

@ -58,11 +58,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
//messageExt.propert
logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel);
logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel);
StaffBaseInfoFormDTO obj = JSON.parseObject(msg, StaffBaseInfoFormDTO.class);
DistributedLock distributedLock = null;
@ -87,7 +87,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【开放数据事件监听器】-staff-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -102,7 +102,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
}
}

69
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java

@ -18,13 +18,9 @@
package com.epmet.opendata.service;
import com.epmet.commons.mybatis.service.BaseService;
import com.epmet.commons.tools.page.PageData;
import com.epmet.opendata.dto.BaseDisputeProcessDTO;
import com.epmet.dto.basereport.form.EventInfoFormDTO;
import com.epmet.opendata.entity.BaseDisputeProcessEntity;
import java.util.List;
import java.util.Map;
/**
* 事件信息表
*
@ -34,62 +30,11 @@ import java.util.Map;
public interface BaseDisputeProcessService extends BaseService<BaseDisputeProcessEntity> {
/**
* 默认分页
*
* @param params
* @return PageData<BaseDisputeProcessDTO>
* @author generator
* @date 2021-10-15
*/
PageData<BaseDisputeProcessDTO> page(Map<String, Object> params);
/**
* 默认查询
*
* @param params
* @return java.util.List<BaseDisputeProcessDTO>
* @author generator
* @date 2021-10-15
*/
List<BaseDisputeProcessDTO> list(Map<String, Object> params);
/**
* 单条查询
*
* @param id
* @return BaseDisputeProcessDTO
* @author generator
* @date 2021-10-15
*/
BaseDisputeProcessDTO get(String id);
/**
* 默认保存
*
* @param dto
* @return void
* @author generator
* @date 2021-10-15
*/
void save(BaseDisputeProcessDTO dto);
/**
* 默认更新
*
* @param dto
* @return void
* @author generator
* @date 2021-10-15
*/
void update(BaseDisputeProcessDTO dto);
/**
* 批量删除
*
* @param ids
* @return void
* @author generator
* @date 2021-10-15
* 获取上报事件
* @Param formDTO
* @Return
* @Author zhaoqifeng
* @Date 2021/10/15 16:59
*/
void delete(String[] ids);
void getEventinfo(EventInfoFormDTO formDTO);
}

87
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java

@ -17,23 +17,21 @@
package com.epmet.opendata.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.constant.FieldConstant;
import com.epmet.commons.tools.page.PageData;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.Result;
import com.epmet.dto.basereport.form.EventInfoFormDTO;
import com.epmet.feign.DataStatisticalOpenFeignClient;
import com.epmet.opendata.dao.BaseDisputeProcessDao;
import com.epmet.opendata.dto.BaseDisputeProcessDTO;
import com.epmet.opendata.entity.BaseDisputeProcessEntity;
import com.epmet.opendata.service.BaseDisputeProcessService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
* 事件信息表
@ -43,57 +41,34 @@ import java.util.Map;
*/
@Service
public class BaseDisputeProcessServiceImpl extends BaseServiceImpl<BaseDisputeProcessDao, BaseDisputeProcessEntity> implements BaseDisputeProcessService {
@Resource
private DataStatisticalOpenFeignClient dataStatisticalOpenFeignClient;
/**
* 获取上报事件
*
* @param formDTO
* @Param formDTO
* @Return
* @Author zhaoqifeng
* @Date 2021/10/15 16:59
*/
@Override
public PageData<BaseDisputeProcessDTO> page(Map<String, Object> params) {
IPage<BaseDisputeProcessEntity> page = baseDao.selectPage(
getPage(params, FieldConstant.CREATED_TIME, false),
getWrapper(params)
);
return getPageData(page, BaseDisputeProcessDTO.class);
}
@Override
public List<BaseDisputeProcessDTO> list(Map<String, Object> params) {
List<BaseDisputeProcessEntity> entityList = baseDao.selectList(getWrapper(params));
return ConvertUtils.sourceToTarget(entityList, BaseDisputeProcessDTO.class);
}
public void getEventinfo(EventInfoFormDTO formDTO) {
Result<List<BaseDisputeProcessDTO>> result = dataStatisticalOpenFeignClient.getEventInfo(formDTO);
if (!result.success()) {
throw new RenException(result.getInternalMsg());
}
List<BaseDisputeProcessDTO> list = result.getData();
if (CollectionUtils.isNotEmpty(list)) {
List<BaseDisputeProcessEntity> entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class);
if("add".equals(formDTO.getType())){
insertBatch(entityList);
}else {
updateBatchById(entityList);
}
}
private QueryWrapper<BaseDisputeProcessEntity> getWrapper(Map<String, Object> params){
String id = (String)params.get(FieldConstant.ID_HUMP);
QueryWrapper<BaseDisputeProcessEntity> wrapper = new QueryWrapper<>();
wrapper.eq(StringUtils.isNotBlank(id), FieldConstant.ID, id);
return wrapper;
}
@Override
public BaseDisputeProcessDTO get(String id) {
BaseDisputeProcessEntity entity = baseDao.selectById(id);
return ConvertUtils.sourceToTarget(entity, BaseDisputeProcessDTO.class);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void save(BaseDisputeProcessDTO dto) {
BaseDisputeProcessEntity entity = ConvertUtils.sourceToTarget(dto, BaseDisputeProcessEntity.class);
insert(entity);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void update(BaseDisputeProcessDTO dto) {
BaseDisputeProcessEntity entity = ConvertUtils.sourceToTarget(dto, BaseDisputeProcessEntity.class);
updateById(entity);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void delete(String[] ids) {
// 逻辑删除(@TableLogic 注解)
baseDao.deleteBatchIds(Arrays.asList(ids));
}
}

2
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java

@ -118,6 +118,8 @@ public class UserPatrolRecordServiceImpl extends BaseServiceImpl<UserPatrolRecor
if (effectRow == 0) {
baseDao.insert(recordEntity);
}
midPatrolFormDTO.setPage(false);
//todo 过滤掉巡查结束时间之后的轨迹
Result<List<MidPatrolDetailResult>> detailResult = dataStatisticalOpenFeignClient.getPatrolDetailList(midPatrolFormDTO);
if (detailResult == null || !detailResult.success()) {
log.error("获取巡查记录明细失败,param:{}", JSON.toJSONString(midPatrolFormDTO));

8
epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java

@ -51,7 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
private void consumeMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg);
InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
@ -82,7 +82,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
@ -96,9 +96,9 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel);
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel);
//logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
/* @Override

Loading…
Cancel
Save