diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java index 4f9e361fc5..ee243d88dd 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java @@ -2,22 +2,19 @@ package com.epmet.mq.listener.listener; import com.alibaba.fastjson.JSON; import com.epmet.auth.constants.AuthOperationEnum; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.LoginMQMsg; -import com.epmet.commons.tools.constant.ServiceConstant; import com.epmet.commons.tools.distributedlock.DistributedLock; -import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; -import com.epmet.commons.tools.feign.ResultDataResolver; -import com.epmet.commons.tools.utils.IpUtils; -import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; -import com.epmet.dto.CustomerStaffDTO; import com.epmet.entity.LogOperationEntity; -import com.epmet.feign.EpmetUserOpenFeignClient; import com.epmet.mq.listener.bean.log.LogOperationHelper; import com.epmet.mq.listener.bean.log.OperatorInfo; import com.epmet.service.LogOperationService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -40,8 +37,15 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -54,6 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("认证操作日志监听器-收到消息内容:{}", msg); LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class); @@ -91,5 +96,27 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java index f953c35edc..d883b7b1f6 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java @@ -1,10 +1,13 @@ package com.epmet.mq.listener.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.PointRuleChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.entity.LogOperationEntity; import com.epmet.enums.SystemMessageTypeEnum; @@ -34,8 +37,15 @@ public class PointOperationLogListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -48,6 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String opeType = messageExt.getTags(); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("积分操作日志监听器-收到消息内容:{}", msg); PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class); @@ -87,5 +98,27 @@ public class PointOperationLogListener implements MessageListenerConcurrently { } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java index 13384ea62e..d968d95448 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java @@ -1,11 +1,14 @@ package com.epmet.mq.listener.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.LoginMQMsg; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.entity.LogOperationEntity; import com.epmet.mq.listener.bean.log.LogOperationHelper; @@ -34,8 +37,15 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -48,6 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { //String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("项目变动操作日志监听器-收到消息内容:{}", msg); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); @@ -87,6 +98,14 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } } private String getOperationTypeDisplay(String type) { @@ -107,4 +126,18 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently return null; } } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); + } } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java similarity index 81% rename from epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java rename to epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java index 593ab0cc57..a69de64cca 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java @@ -1,4 +1,4 @@ -package com.epmet.constant; +package com.epmet.commons.rocketmq.constants; /** * @Description MQ用户自定义属性 diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/constant/StrConstant.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/constant/StrConstant.java index c601d2fd71..fd4a97d11e 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/constant/StrConstant.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/constant/StrConstant.java @@ -44,6 +44,11 @@ public interface StrConstant { */ String COLON = ":"; + /** + * 英文分号 + */ + String SEMICOLON = ";"; + /** * 中文顿号 */ diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java index 134124f512..4820f4d50b 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java @@ -20,8 +20,17 @@ public class PageFormDTO { } @NotNull(message = "页码不能为空", groups = AddUserInternalGroup.class) - private Integer pageNo; + private Integer pageNo = 1; @NotNull(message = "每页数量不能为空", groups = AddUserInternalGroup.class) - private Integer pageSize; + private Integer pageSize = 20; + + /** + * 偏移量 从多少条开始 + */ + private Integer offset; + + public Integer getOffset() { + return (pageNo-1)*pageSize; + } } diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 9d2497f24e..ad66110c51 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -556,7 +556,7 @@ public class RedisKeys { * @author wxz * @date 2021.10.14 14:33:53 */ - public static String checkPendingMqMsgKey(String pendingMsgLabel) { + public static String pendingMqMsgKey(String pendingMsgLabel) { return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel); } } diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/param/MidPatrolFormDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/param/MidPatrolFormDTO.java new file mode 100644 index 0000000000..b0810e54cc --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/param/MidPatrolFormDTO.java @@ -0,0 +1,29 @@ +package com.epmet.dto.user.param; + +import com.epmet.commons.tools.dto.form.PageFormDTO; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * desc:查询巡查 参数 + * + * @author zhaoqifeng + * @dscription + * @date 2021/6/7 16:23 + */ +@NoArgsConstructor +@Data +public class MidPatrolFormDTO extends PageFormDTO implements Serializable { + private static final long serialVersionUID = 4411051728689886810L; + /** + * 客户Id + */ + private String customerId; + /** + * 巡查记录id 没有则查询全部 + */ + private String patrolId; + +} diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/result/MidPatrolDetailResult.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/result/MidPatrolDetailResult.java new file mode 100644 index 0000000000..434d2c8740 --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/result/MidPatrolDetailResult.java @@ -0,0 +1,56 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.dto.user.result; + +import lombok.Data; + +import java.io.Serializable; + + +/** + * 工作人员巡查明细记录 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-06-07 + */ +@Data +public class MidPatrolDetailResult implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 主键 + */ + private String id; + + /** + * 客户Id + */ + private String customerId; + + /** + * 维度 + */ + private String latitude; + + /** + * 经度 + */ + private String longitude; + +} diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/result/MidPatrolRecordResult.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/result/MidPatrolRecordResult.java new file mode 100644 index 0000000000..a08d7cba17 --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/user/result/MidPatrolRecordResult.java @@ -0,0 +1,132 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.dto.user.result; + +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + + +/** + * 工作人员巡查主记录 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-06-07 + */ +@Data +public class MidPatrolRecordResult implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 主键 + */ + private String id; + + /** + * 客户Id + */ + private String customerId; + + /** + * 网格id + */ + private String grid; + + /** + * 网格所有上级id + */ + private String gridPids; + + /** + * 工作人员用户id + */ + private String staffId; + + /** + * 工作人员所属组织id=网格所属的组织id + */ + private String agencyId; + + /** + * 巡查开始时间 + */ + private Date patrolStartTime; + + /** + * 巡查结束时间,前端传入 + */ + private Date patrolEndTime; + + /** + * 实际结束时间=操作结束巡查的时间 + */ + private Date actrualEndTime; + + /** + * 本次巡查总耗时,单位秒;结束巡查时写入 + */ + private Integer totalTime; + + /** + * 正在巡查中:patrolling;结束:end + */ + private String status; + + /** + * 删除标识 0.未删除 1.已删除 + */ + private Integer delFlag; + + /** + * 乐观锁 + */ + private Integer revision; + + /** + * 创建人 + */ + private String createdBy; + + /** + * 创建时间 + */ + private Date createdTime; + + /** + * 更新人 + */ + private String updatedBy; + + /** + * 更新时间 + */ + private Date updatedTime; + + /** + * 维度 + */ + private String latitude; + + /** + * 精度 + */ + private String longitude; + +} diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/DataStatisticalOpenFeignClient.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/DataStatisticalOpenFeignClient.java index 2edd4f663f..4c5e1f9025 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/DataStatisticalOpenFeignClient.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/DataStatisticalOpenFeignClient.java @@ -13,11 +13,16 @@ import com.epmet.dto.org.result.CustomerAgencyDTO; import com.epmet.dto.org.result.CustomerGridDTO; import com.epmet.dto.screen.form.InitCustomerIndexForm; import com.epmet.dto.stats.form.CustomerIdAndDateIdFormDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; +import com.epmet.dto.user.result.CustomerStaffDTO; +import com.epmet.dto.user.result.MidPatrolDetailResult; +import com.epmet.dto.user.result.MidPatrolRecordResult; import com.epmet.dto.user.result.GridUserInfoDTO; import com.epmet.feign.impl.DataStatisticalOpenFeignClientFallBackFactory; import com.epmet.opendata.dto.form.GridBaseInfoFormDTO; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -304,4 +309,27 @@ public interface DataStatisticalOpenFeignClient { */ @PostMapping("/data/stats/datareporting/staffbaseinfo") Result> getStaffBaseInfo(@RequestBody StaffBaseInfoFormDTO formDTO); + Result> getStaffBaseInfo(@RequestBody StaffBaseInfoFormDTO formDTO); + + /** + * 根据巡查记录id 获取巡查主记录信息 + * + * @param midPatrolFormDTO + * @return + * @author yinzuomei + * @date 2021/9/10 8:56 上午 + */ + @PostMapping(value = "/data/stats/datareporting/getPatrolRecordList") + Result> getPatrolRecordList(@RequestBody MidPatrolFormDTO midPatrolFormDTO); + + /** + * 根据巡查记录id 获取巡查轨迹(明细)信息 + * + * @param midPatrolFormDTO + * @return + * @author yinzuomei + * @date 2021/9/10 8:56 上午 + */ + @PostMapping(value = "/data/stats/datareporting/getPatrolDetailList") + Result> getPatrolDetailList(@RequestBody MidPatrolFormDTO midPatrolFormDTO); } diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/impl/DataStatisticalOpenFeignClientFallBack.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/impl/DataStatisticalOpenFeignClientFallBack.java index 4bcd51dc62..fd66b4a73f 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/impl/DataStatisticalOpenFeignClientFallBack.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/feign/impl/DataStatisticalOpenFeignClientFallBack.java @@ -14,6 +14,10 @@ import com.epmet.dto.org.result.CustomerAgencyDTO; import com.epmet.dto.org.result.CustomerGridDTO; import com.epmet.dto.screen.form.InitCustomerIndexForm; import com.epmet.dto.stats.form.CustomerIdAndDateIdFormDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; +import com.epmet.dto.user.result.CustomerStaffDTO; +import com.epmet.dto.user.result.MidPatrolDetailResult; +import com.epmet.dto.user.result.MidPatrolRecordResult; import com.epmet.dto.user.result.GridUserInfoDTO; import com.epmet.feign.DataStatisticalOpenFeignClient; import com.epmet.opendata.dto.form.GridBaseInfoFormDTO; @@ -289,4 +293,30 @@ public class DataStatisticalOpenFeignClientFallBack implements DataStatisticalOp public Result> getStaffBaseInfo(StaffBaseInfoFormDTO formDTO) { return ModuleUtils.feignConError(ServiceConstant.DATA_STATISTICAL_SERVER, "getStaffBaseInfo", formDTO); } + + /** + * 根据巡查记录id 获取巡查主记录信息 + * + * @param midPatrolFormDTO + * @return + * @author yinzuomei + * @date 2021/9/10 8:56 上午 + */ + @Override + public Result> getPatrolRecordList(MidPatrolFormDTO midPatrolFormDTO) { + return ModuleUtils.feignConError(ServiceConstant.DATA_STATISTICAL_SERVER, "getPatrolRecordList", midPatrolFormDTO); + } + + /** + * 根据巡查记录id 获取巡查轨迹(明细)信息 + * + * @param midPatrolFormDTO + * @return + * @author yinzuomei + * @date 2021/9/10 8:56 上午 + */ + @Override + public Result> getPatrolDetailList(MidPatrolFormDTO midPatrolFormDTO) { + return ModuleUtils.feignConError(ServiceConstant.DATA_STATISTICAL_SERVER, "getPatrolDetailList", midPatrolFormDTO); + } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/DataReportingController.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/DataReportingController.java index 394f5218a5..23ad509167 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/DataReportingController.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/DataReportingController.java @@ -4,6 +4,10 @@ import com.epmet.commons.tools.utils.Result; import com.epmet.commons.tools.validator.ValidatorUtils; import com.epmet.dto.org.result.CustomerAgencyDTO; import com.epmet.dto.org.result.CustomerGridDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; +import com.epmet.dto.user.result.CustomerStaffDTO; +import com.epmet.dto.user.result.MidPatrolDetailResult; +import com.epmet.dto.user.result.MidPatrolRecordResult; import com.epmet.dto.user.result.GridUserInfoDTO; import com.epmet.opendata.dto.form.GridBaseInfoFormDTO; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; @@ -57,4 +61,32 @@ public class DataReportingController { } + /** + * desc: 条件获取巡查主表信息 + * + * @param formDTO + * @return com.epmet.commons.tools.utils.Result> + * @author LiuJanJun + * @date 2021/10/15 12:01 下午 + */ + @PostMapping("getPatrolRecordList") + public Result> getPatrolRecordList(@RequestBody(required = false) MidPatrolFormDTO formDTO) { + ValidatorUtils.validateEntity(formDTO, StaffBaseInfoFormDTO.Staff.class); + return new Result>().ok(dataReportingService.getPatrolRecordList(formDTO)); + } + + /** + * desc: 条件获取巡查明细信息 + * + * @param formDTO + * @return com.epmet.commons.tools.utils.Result> + * @author LiuJanJun + * @date 2021/10/15 12:01 下午 + */ + @PostMapping("getPatrolDetailList") + public Result> getPatrolDetailList(@RequestBody(required = false) MidPatrolFormDTO formDTO) { + ValidatorUtils.validateEntity(formDTO, StaffBaseInfoFormDTO.Staff.class); + return new Result>().ok(dataReportingService.getPatrolDetailList(formDTO)); + } + } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/user/UserDao.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/user/UserDao.java index 73de31ec9f..fabf82d123 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/user/UserDao.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/user/UserDao.java @@ -5,6 +5,7 @@ import com.epmet.dto.extract.form.StaffPatrolStatsFormDTO; import com.epmet.dto.extract.result.UserPartyResultDTO; import com.epmet.dto.screen.ScreenProjectDataDTO; import com.epmet.dto.stats.form.GmUploadEventFormDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; import com.epmet.dto.user.result.*; import com.epmet.entity.evaluationindex.screen.ScreenPartyUserRankDataEntity; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; @@ -259,5 +260,9 @@ public interface UserDao { * @Description 批量查询客户网格员基础信息 **/ List getStaffBaseInfo(StaffBaseInfoFormDTO formDTO); + + List getPatrolRecordList(MidPatrolFormDTO formDTO); + + List getPatrolDetailList(MidPatrolFormDTO formDTO); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index f62cfe09f8..932ce9e28f 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -1,8 +1,10 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; @@ -45,12 +47,18 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + long start = System.currentTimeMillis(); try { - List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); - for (String msgStr : msgStrs) { - consumeMessage(msgStr); + //List msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList()); + //for (String msgStr : msgStrs) { + // consumeMessage(msgStr); + //} + + for (MessageExt msgExt : msgs) { + consumeMessage(msgExt); } + } catch (Exception e) { //失败不重发 logger.error("consumeMessage fail,msg:{}",e.getMessage()); @@ -60,7 +68,10 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } - private void consumeMessage(String msg) { + private void consumeMessage(MessageExt msgExt) { + String msg = new String(msgExt.getBody()); + String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + logger.info("receive customerId:{}", JSON.toJSONString(msg)); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); if (msgObj == null){ @@ -87,6 +98,14 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } else { log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId()); } + + if (org.apache.commons.lang.StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【项目变动事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } } public void consumeMessage(ProjectChangedMQMsg msgObj) { @@ -151,6 +170,20 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently } + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【项目变动事件监听器】删除mq滞留消息缓存失败:{}", pendingMsgLabel); + } + /*@Override public ConsumerConfigProperties getConsumerProperty() { ConsumerConfigProperties configProperties = new ConsumerConfigProperties(); diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/DataReportingService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/DataReportingService.java index 607a0ac83a..d1f051f8c6 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/DataReportingService.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/DataReportingService.java @@ -2,6 +2,10 @@ package com.epmet.service; import com.epmet.dto.org.result.CustomerAgencyDTO; import com.epmet.dto.org.result.CustomerGridDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; +import com.epmet.dto.user.result.CustomerStaffDTO; +import com.epmet.dto.user.result.MidPatrolDetailResult; +import com.epmet.dto.user.result.MidPatrolRecordResult; import com.epmet.dto.user.result.GridUserInfoDTO; import com.epmet.opendata.dto.form.GridBaseInfoFormDTO; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; @@ -34,4 +38,24 @@ public interface DataReportingService { * @return*/ List getStaffBaseInfo(StaffBaseInfoFormDTO formDTO); + /** + * desc: 获取巡查记录列表 + * + * @param formDTO + * @return java.util.List + * @author LiuJanJun + * @date 2021/10/15 1:21 下午 + */ + List getPatrolRecordList(MidPatrolFormDTO formDTO); + + /** + * desc: 获取巡查明细列表 + * + * @param formDTO + * @return java.util.List + * @author LiuJanJun + * @date 2021/10/15 1:22 下午 + */ + List getPatrolDetailList(MidPatrolFormDTO formDTO); + } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java index 7a7e8f0c29..6e2396199d 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java @@ -2,13 +2,17 @@ package com.epmet.service.impl; import com.epmet.dto.org.result.CustomerAgencyDTO; import com.epmet.dto.org.result.CustomerGridDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; import com.epmet.dto.user.result.CustomerStaffDTO; +import com.epmet.dto.user.result.MidPatrolDetailResult; +import com.epmet.dto.user.result.MidPatrolRecordResult; import com.epmet.dto.user.result.GridUserInfoDTO; import com.epmet.opendata.dto.form.GridBaseInfoFormDTO; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; import com.epmet.service.DataReportingService; import com.epmet.service.org.CustomerAgencyService; import com.epmet.service.org.CustomerGridService; +import com.epmet.service.user.StatsStaffPatrolService; import com.epmet.service.user.UserService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -30,6 +34,8 @@ public class DataReportingServiceImpl implements DataReportingService { private CustomerGridService customerGridService; @Autowired private UserService userService; + @Autowired + private StatsStaffPatrolService statsStaffPatrolService; /** * @Author sun @@ -99,4 +105,14 @@ public class DataReportingServiceImpl implements DataReportingService { return resultList; } + @Override + public List getPatrolRecordList(MidPatrolFormDTO formDTO) { + return userService.getPatrolRecordList(formDTO); + } + + @Override + public List getPatrolDetailList(MidPatrolFormDTO formDTO) { + return userService.getPatrolDetailList(formDTO); + } + } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/UserService.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/UserService.java index 1ea5ad88bf..e7ff857219 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/UserService.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/UserService.java @@ -7,6 +7,9 @@ import com.epmet.dto.org.result.OrgStaffDTO; import com.epmet.dto.screen.ScreenProjectDataDTO; import com.epmet.dto.stats.form.GmUploadEventFormDTO; import com.epmet.dto.stats.user.result.UserStatisticalData; +import com.epmet.dto.user.StaffBaseInfoFormDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; +import com.epmet.dto.user.result.*; import com.epmet.dto.user.result.CustomerStaffDTO; import com.epmet.dto.user.result.StaffPatrolRecordResult; import com.epmet.dto.user.result.StaffRoleInfoDTO; @@ -144,4 +147,8 @@ public interface UserService { * @Description 批量查询客户网格员基础信息 **/ List getStaffBaseInfo(StaffBaseInfoFormDTO formDTO); + + List getPatrolRecordList(MidPatrolFormDTO formDTO); + + List getPatrolDetailList(MidPatrolFormDTO formDTO); } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/impl/UserServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/impl/UserServiceImpl.java index eca7be700c..433b659b59 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/impl/UserServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/user/impl/UserServiceImpl.java @@ -17,6 +17,8 @@ import com.epmet.dto.screen.ScreenProjectDataDTO; import com.epmet.dto.stats.form.GmUploadEventFormDTO; import com.epmet.dto.stats.user.*; import com.epmet.dto.stats.user.result.UserStatisticalData; +import com.epmet.dto.user.StaffBaseInfoFormDTO; +import com.epmet.dto.user.param.MidPatrolFormDTO; import com.epmet.dto.user.result.*; import com.epmet.entity.evaluationindex.screen.ScreenPartyUserRankDataEntity; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; @@ -1098,4 +1100,14 @@ public class UserServiceImpl implements UserService { return userDao.getStaffBaseInfo(formDTO); } + @Override + public List getPatrolRecordList(MidPatrolFormDTO formDTO) { + return userDao.getPatrolRecordList(formDTO); + } + + @Override + public List getPatrolDetailList(MidPatrolFormDTO formDTO) { + return userDao.getPatrolDetailList(formDTO); + } + } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml index ad8f9de530..6778c0305c 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml +++ b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml @@ -1004,5 +1004,32 @@ + + diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java index 002d9ea424..cc353dd449 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java @@ -32,18 +32,9 @@ public class SystemMessageController { return new Result(); } - /** - * @description 应答mq消息 - * - * @param form - * @return - * @author wxz - * @date 2021.10.14 16:07:17 - */ - @PostMapping("ack-mq-msg") - public Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { - ValidatorUtils.validateEntity(form, SystemMsgFormDTO.AckMsgByMQ.class); - systemMessageService.ackMqMessage(form.getPendingMsgLabel()); + @PostMapping("pendding-mq-msg-scan") + public Result penddingMqMsgScan() { + systemMessageService.penddingMqMsgScan(); return new Result(); } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java index 25189683f0..f48e56cf9f 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java @@ -14,13 +14,12 @@ public interface SystemMessageService { void sendMQMessage(String messageType, Object content); /** - * @description 消息应答 + * @description 扫描滞留消息 * - * @param pendingMsgLabel + * @param * @return * @author wxz - * @date 2021.10.14 15:07:09 + * @date 2021.10.15 10:13:37 */ - void ackMqMessage(String pendingMsgLabel); - + void penddingMqMsgScan(); } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index a0add17a03..7bbc426d9e 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -2,13 +2,13 @@ package com.epmet.service.impl; import com.alibaba.fastjson.JSON; import com.epmet.auth.constants.AuthOperationConstants; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; -import com.epmet.constant.MQUserPropertys; import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageType; import com.epmet.dao.SystemMessageDao; @@ -26,7 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; +import java.time.LocalDateTime; +import java.util.Set; import java.util.UUID; @Service @@ -34,6 +35,21 @@ public class SystemMessageServiceImpl implements SystemMessageService { private Logger logger = LoggerFactory.getLogger(getClass()); + // 消息堆积时间阈值,单位s + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1 = 1 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L2 = 2 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L3 = 5 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L4 = 10 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L5 = 30 * 60; + private static final long PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6 = 60 * 60; + + // 堆积消息告警间隔时长,单位s + public static final long PENDDING_MQ_MSG_ALERT_SECONDS_DELTA = 60 * 60; + + // 各个级别上次告警时间 + private LocalDateTime l1LastAlertTime; + private LocalDateTime l6LastAlertTime; + @Autowired private SystemMessageDao systemMessageDao; @@ -47,27 +63,35 @@ public class SystemMessageServiceImpl implements SystemMessageService { @Override public void sendMQMessage(String messageType, Object content) { String contentStr = JSON.toJSONString(content); - //存储消息到表 + // 1.存储消息到表 SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); systemMessageEntity.setMsgType(messageType); systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ); systemMessageEntity.setContent(contentStr); systemMessageDao.insert(systemMessageEntity); - // 缓存下来,供滞留消息扫描。TTL -1,永不过期 - MessageCacheBean mcb = new MessageCacheBean(new Date(), messageType, contentStr); - String pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); - String pendingMsgKey = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); - redisUtils.set(pendingMsgKey, mcb, -1); + String topic = getTopicByMsgType(messageType); + + // 2.缓存下来,供滞留消息扫描。TTL -1,永不过期 + String pendingMsgLabel = null; + String pendingMsgKey = null; + try { + MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr); + pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); + pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.set(pendingMsgKey, mcb, -1); + } catch (Exception e) { + logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); + } - //发送mq消息 + // 3.发送mq消息 try { - Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); + Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); rocketMQTemplate.getProducer().send(meMessage); } catch (Exception e) { String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); - logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace); + logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); // 清理消息缓存 redisUtils.delete(pendingMsgKey); @@ -122,18 +146,60 @@ public class SystemMessageServiceImpl implements SystemMessageService { } @Override - public void ackMqMessage(String pendingMsgLabel) { - String key = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel); - redisUtils.delete(key); - logger.info("【MQ消息应答】pendingMsgLabel{}", pendingMsgLabel); + public void penddingMqMsgScan() { + String scanKey = RedisKeys.pendingMqMsgKey("*"); + Set keys = redisUtils.keys(scanKey); + //System.out.println(keys); + for (String key : keys) { + MessageCacheBean mcb = (MessageCacheBean) redisUtils.get(key); + + LocalDateTime createTime = mcb.getCreateTime(); + LocalDateTime now = LocalDateTime.now(); + //long deltaSeconds = ChronoUnit.SECONDS.between(createTime, now); + + // 此处暂时使用粗粒度的Topic判断,耕细粒度的应该使用SystemMessageType + switch (mcb.getTopic()) { + case TopicConstants.AUTH: + case TopicConstants.GROUP_ACHIEVEMENT: + case TopicConstants.INIT_CUSTOMER: + case TopicConstants.ORG: + case TopicConstants.PATROL: + case TopicConstants.POINT: + case TopicConstants.STAFF: + + // 耗时较短。一个小时最多发送一次告警 + if (l1LastAlertTime == null || ( + createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L1).isBefore(now) + && l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) + ) { + + logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + l1LastAlertTime = now; + + } + break; + + case TopicConstants.PROJECT_CHANGED: + // 耗时较长,一个小时最多发送一次告警 + if (l1LastAlertTime == null || ( + createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now) + && l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) + ) { + logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + l1LastAlertTime = now; + } + break; + } + } } @Data @NoArgsConstructor @AllArgsConstructor - class MessageCacheBean { - private Date createTime; + static class MessageCacheBean { + private LocalDateTime createTime; private String messageType; + private String topic; private Object content; } } diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java index 19ca52066c..3554e961da 100644 --- a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java +++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java @@ -1,12 +1,17 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.form.CategoryTagInitFormDTO; import com.epmet.service.IssueProjectCategoryDictService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -32,8 +37,15 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu @Autowired private DistributedLock distributedLock; + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -45,6 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu public void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -66,5 +79,27 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存失败:{}", pendingMsgLabel); } } \ No newline at end of file diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java index 7224ea757a..e554e36f74 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java @@ -1,16 +1,20 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.constant.UserWorkType; import com.epmet.dto.CustomerAgencyDTO; import com.epmet.dto.form.AddAgencyAndStaffFormDTO; import com.epmet.dto.form.AdminStaffFromDTO; import com.epmet.service.AgencyService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -32,8 +36,15 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -45,6 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -65,6 +77,14 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } } /** @@ -100,6 +120,20 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently return agencyAndStaff; } + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【客户初始化事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); + } + /* @Override public ConsumerConfigProperties getConsumerProperty() { ConsumerConfigProperties configProperties = new ConsumerConfigProperties(); diff --git a/epmet-module/open-data-worker/open-data-worker-client/src/main/java/com/epmet/opendata/dto/form/UpsertPatrolRecordForm.java b/epmet-module/open-data-worker/open-data-worker-client/src/main/java/com/epmet/opendata/dto/form/UpsertPatrolRecordForm.java index ecd3fee58d..23d412318b 100644 --- a/epmet-module/open-data-worker/open-data-worker-client/src/main/java/com/epmet/opendata/dto/form/UpsertPatrolRecordForm.java +++ b/epmet-module/open-data-worker/open-data-worker-client/src/main/java/com/epmet/opendata/dto/form/UpsertPatrolRecordForm.java @@ -2,6 +2,8 @@ package com.epmet.opendata.dto.form; import lombok.Data; +import javax.validation.constraints.NotEmpty; + /** * 插入或更新巡查记录主表 * @author liujianjun @@ -11,11 +13,13 @@ public class UpsertPatrolRecordForm { /** * 客户Id */ + @NotEmpty(message = "customerId不能为空") private String customerId; /** * 巡查记录id */ + @NotEmpty(message = "customerId不能为空") private String patrolId; /** @@ -23,6 +27,7 @@ public class UpsertPatrolRecordForm { * SystemMessageType.USER_PATROL_START * SystemMessageTypSTOP */ + @NotEmpty(message = "actionType不能为空") private String actionType; } diff --git a/epmet-module/open-data-worker/open-data-worker-server/Dockerfile b/epmet-module/open-data-worker/open-data-worker-server/Dockerfile new file mode 100644 index 0000000000..bdbf6243c4 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/Dockerfile @@ -0,0 +1,11 @@ +FROM java:8 + +RUN export LANG="zh_CN.UTF-8" +RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime +RUN echo 'Asia/Shanghai' > /etc/timezone + +COPY ./target/*.jar ./open-data-worker.jar + +EXPOSE 8107 + +ENTRYPOINT ["sh", "-c", "exec $RUN_INSTRUCT"] \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-dev.yml b/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-dev.yml new file mode 100644 index 0000000000..5df8669717 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-dev.yml @@ -0,0 +1,18 @@ +version: "3.7" +services: + gov-voice-server: + container_name: open-data-worker-server-dev + image: 192.168.1.140:5000/epmet-cloud-dev/open-data-worker-server:version_placeholder + ports: + - "8107:8107" + network_mode: host # 使用现有网络 + volumes: + - "/opt/epmet-cloud-logs/dev:/logs" + environment: + RUN_INSTRUCT: "java -Xms32m -Xmx200m -jar ./open-data-worker.jar" + restart: "unless-stopped" + deploy: + resources: + limits: + cpus: '0.1' + memory: 250M \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-prod.yml b/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-prod.yml new file mode 100644 index 0000000000..feedbad307 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-prod.yml @@ -0,0 +1,18 @@ +version: "3.7" +services: + gov-voice-server: + container_name: open-data-worker-server-prod + image: registry-vpc.cn-qingdao.aliyuncs.com/epmet-cloud-master/open-data-worker-server:0.3.69 + ports: + - "8107:8107" + network_mode: host # 使用现有网络 + volumes: + - "/opt/epmet-cloud-logs/prod:/logs" + environment: + RUN_INSTRUCT: "java -Xms256m -Xmx512m -jar ./open-data-worker.jar" + restart: "unless-stopped" + deploy: + resources: + limits: + cpus: '0.1' + memory: 600M \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-test.yml b/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-test.yml new file mode 100644 index 0000000000..faa41dfeed --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/deploy/docker-compose-test.yml @@ -0,0 +1,18 @@ +version: "3.7" +services: + gov-voice-server: + container_name: open-data-worker-server-test + image: registry-vpc.cn-qingdao.aliyuncs.com/epmet-cloud-release/open-data-worker-server:version_placeholder + ports: + - "8107:8107" + network_mode: host # 使用现有网络 + volumes: + - "/opt/epmet-cloud-logs/test:/logs" + environment: + RUN_INSTRUCT: "java -Xms32m -Xmx300m -jar ./open-data-worker.jar" + restart: "unless-stopped" + deploy: + resources: + limits: + cpus: '0.1' + memory: 350M \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/pom.xml b/epmet-module/open-data-worker/open-data-worker-server/pom.xml index 544dd4bc94..6a40fad5b3 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/pom.xml +++ b/epmet-module/open-data-worker/open-data-worker-server/pom.xml @@ -72,6 +72,12 @@ 2.0.0 compile + + com.epmet + epmet-user-client + 2.0.0 + compile + com.epmet data-statistical-client @@ -184,7 +190,7 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd - local + false 192.168.1.140:9876;192.168.1.141:9876 diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/aspect/RequestLogAspect.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/aspect/RequestLogAspect.java new file mode 100644 index 0000000000..506c00da2d --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/aspect/RequestLogAspect.java @@ -0,0 +1,40 @@ +package com.epmet.opendata.aspect; + +import com.epmet.commons.tools.aspect.BaseRequestLogAspect; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import javax.servlet.http.HttpServletRequest; + +/** + * 日志/异常处理切面实现,调用父类方法完成日志记录和异常处理。 + */ +@Aspect +@Component +@Order(0) +public class RequestLogAspect extends BaseRequestLogAspect { + + @Override + @Around(value = "execution(* com.epmet.opendata.controller.*Controller*.*(..)) ") + public Object proceed(ProceedingJoinPoint point) throws Throwable { + return super.proceed(point, getRequest()); + } + + /** + * 获取Request对象 + * + * @return + */ + private HttpServletRequest getRequest() { + RequestAttributes ra = RequestContextHolder.getRequestAttributes(); + ServletRequestAttributes sra = (ServletRequestAttributes) ra; + return sra.getRequest(); + } + +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/ModuleConfigImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/ModuleConfigImpl.java new file mode 100644 index 0000000000..802c020918 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/ModuleConfigImpl.java @@ -0,0 +1,26 @@ +/** + * Copyright (c) 2018 人人开源 All rights reserved. + * + * https://www.renren.io + * + * 版权所有,侵权必究! + */ + +package com.epmet.opendata.config; + +import com.epmet.commons.tools.config.ModuleConfig; +import org.springframework.stereotype.Service; + +/** + * 模块配置信息 + * + * @author Mark sunlightcs@gmail.com + * @since 1.0.0 + */ +@Service +public class ModuleConfigImpl implements ModuleConfig { + @Override + public String getName() { + return "govvoice"; + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/dao/StaffPatrolDetailDao.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/dao/UserPatrolDetailDao.java similarity index 92% rename from epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/dao/StaffPatrolDetailDao.java rename to epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/dao/UserPatrolDetailDao.java index ecae380d72..57fb4edc6d 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/dao/StaffPatrolDetailDao.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/dao/UserPatrolDetailDao.java @@ -28,6 +28,6 @@ import org.apache.ibatis.annotations.Mapper; * @since v1.0.0 2021-10-14 */ @Mapper -public interface StaffPatrolDetailDao extends BaseDao { +public interface UserPatrolDetailDao extends BaseDao { } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/healthcheck/HealthCheckController.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/healthcheck/HealthCheckController.java new file mode 100644 index 0000000000..0a6b1d6264 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/healthcheck/HealthCheckController.java @@ -0,0 +1,21 @@ +package com.epmet.opendata.healthcheck; + +import com.epmet.commons.tools.utils.Result; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("healthcheck") +public class HealthCheckController { + + /** + * http健康检查 + * @return + */ + @PostMapping("http") + public Result httpHealthCheck() { + return new Result(); + } + +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java index be8849d216..a38e2cabd6 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -1,10 +1,12 @@ package com.epmet.opendata.mq.listener; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.alibaba.fastjson.JSON; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; -import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.constant.MQUserPropertys; import com.epmet.dto.form.SystemMsgFormDTO; @@ -32,17 +34,13 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent private Logger logger = LoggerFactory.getLogger(getClass()); - private EpmetMessageOpenFeignClient messageOpenFeignClient; + private RedisUtils redisUtils; @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - if (messageOpenFeignClient == null) { - try { - messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class); - } catch (Exception e) { - e.printStackTrace(); - } + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); } try { @@ -58,6 +56,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); @@ -87,30 +86,26 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent distributedLock.unLock(lock); } - // 应答mq消息(调用message服务的应答api) if (StringUtils.isNotBlank(pendingMsgLabel)) { try { - ackMqMsg(pendingMsgLabel); + removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } /** - * @description 应答mq消息 + * @description * * @param pendingMsgLabel * @return * @author wxz * @date 2021.10.14 16:32:32 */ - private void ackMqMsg(String pendingMsgLabel) { - SystemMsgFormDTO form = new SystemMsgFormDTO(); - form.setPendingMsgLabel(pendingMsgLabel); - Result result = messageOpenFeignClient.ackSystemMsgByMQ(form); - if (!result.success()) { - logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg()); - } + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【开放数据事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java index a1598adfc6..99d35daac1 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java @@ -1,14 +1,18 @@ package com.epmet.opendata.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.StaffPatrolMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.opendata.dto.form.UpsertPatrolRecordForm; import com.epmet.opendata.service.UserPatrolRecordService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -30,8 +34,15 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -45,7 +56,9 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags); StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class); @@ -75,5 +88,26 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java index 2580364741..270c7ff3e3 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -1,13 +1,13 @@ package com.epmet.opendata.mq.listener; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.alibaba.fastjson.JSON; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; -import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; -import com.epmet.constant.MQUserPropertys; -import com.epmet.dto.form.SystemMsgFormDTO; import com.epmet.feign.EpmetMessageOpenFeignClient; import com.epmet.opendata.dto.form.StaffBaseInfoFormDTO; import com.epmet.opendata.service.BaseGridUserService; @@ -34,11 +34,13 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre private EpmetMessageOpenFeignClient messageOpenFeignClient; + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - if (messageOpenFeignClient == null) { - messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class); + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); } try { @@ -54,6 +56,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); @@ -80,29 +83,26 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre distributedLock.unLock(lock); } - // 应答mq消息(调用message服务的应答api) if (StringUtils.isNotBlank(pendingMsgLabel)) { try { - ackMqMsg(pendingMsgLabel); + removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } /** + * @description + * * @param pendingMsgLabel * @return * @description 应答mq消息 * @author wxz * @date 2021.10.14 16:32:32 */ - private void ackMqMsg(String pendingMsgLabel) { - SystemMsgFormDTO form = new SystemMsgFormDTO(); - form.setPendingMsgLabel(pendingMsgLabel); - Result result = messageOpenFeignClient.ackSystemMsgByMQ(form); - if (!result.success()) { - logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg()); - } + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolDetailServiceImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolDetailServiceImpl.java index 508c8f7859..e5b0956b16 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolDetailServiceImpl.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolDetailServiceImpl.java @@ -18,7 +18,7 @@ package com.epmet.opendata.service.impl; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; -import com.epmet.opendata.dao.StaffPatrolDetailDao; +import com.epmet.opendata.dao.UserPatrolDetailDao; import com.epmet.opendata.entity.UserPatrolDetailEntity; import com.epmet.opendata.service.UserPatrolDetailService; import org.springframework.stereotype.Service; @@ -30,7 +30,7 @@ import org.springframework.stereotype.Service; * @since v1.0.0 2021-10-14 */ @Service -public class UserPatrolDetailServiceImpl extends BaseServiceImpl implements UserPatrolDetailService { +public class UserPatrolDetailServiceImpl extends BaseServiceImpl implements UserPatrolDetailService { diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java index ee6ebed979..ad2d5f04f7 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java @@ -17,12 +17,30 @@ package com.epmet.opendata.service.impl; +import com.alibaba.fastjson.JSON; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.tools.constant.NumConstant; +import com.epmet.commons.tools.constant.StrConstant; +import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.validator.ValidatorUtils; +import com.epmet.constant.SystemMessageType; +import com.epmet.dto.user.param.MidPatrolFormDTO; +import com.epmet.dto.user.result.MidPatrolDetailResult; +import com.epmet.dto.user.result.MidPatrolRecordResult; +import com.epmet.feign.DataStatisticalOpenFeignClient; +import com.epmet.opendata.dao.UserPatrolDetailDao; import com.epmet.opendata.dao.UserPatrolRecordDao; import com.epmet.opendata.dto.form.UpsertPatrolRecordForm; +import com.epmet.opendata.entity.UserPatrolDetailEntity; import com.epmet.opendata.entity.UserPatrolRecordEntity; import com.epmet.opendata.service.UserPatrolRecordService; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; /** @@ -31,12 +49,115 @@ import org.springframework.stereotype.Service; * @author generator generator@elink-cn.com * @since v1.0.0 2021-10-14 */ +@Slf4j @Service public class UserPatrolRecordServiceImpl extends BaseServiceImpl implements UserPatrolRecordService { + @Autowired + private DataStatisticalOpenFeignClient dataStatisticalOpenFeignClient; + @Autowired + private UserPatrolDetailDao userPatrolDetailDao; @Override public Boolean upsertPatrolRecord(UpsertPatrolRecordForm patrolRecordForm) { - return null; + ValidatorUtils.validateEntity(patrolRecordForm); + boolean delFlag = false; + MidPatrolFormDTO midPatrolFormDTO = buildParam(patrolRecordForm); + Result> record = dataStatisticalOpenFeignClient.getPatrolRecordList(midPatrolFormDTO); + if (record == null || !record.success()){ + log.error("获取巡查记录失败,param:{}", JSON.toJSONString(midPatrolFormDTO)); + return false; + } + List data = record.getData(); + if (CollectionUtils.isEmpty(data)){ + //数据已被删除了 + delFlag = true; + //暂时设置error 用于排错 + log.error("获取巡查记录返回为空,param:{}", JSON.toJSONString(midPatrolFormDTO)); + int effectRow = baseDao.deleteById(patrolRecordForm.getPatrolId()); + log.warn("del effectRow:{}",effectRow); + return true; + } + MidPatrolRecordResult recordResult = data.get(NumConstant.ZERO); + switch (patrolRecordForm.getActionType()){ + case SystemMessageType.USER_PATROL_START: + //insert + baseDao.insert(buildEntity(recordResult)); + break; + case SystemMessageType.USER_PATROL_STOP: + //update + baseDao.updateById(buildEntity(recordResult)); + Result> detailResult = dataStatisticalOpenFeignClient.getPatrolDetailList(midPatrolFormDTO); + if (detailResult == null || !detailResult.success()){ + log.error("获取巡查记录明细失败,param:{}", JSON.toJSONString(midPatrolFormDTO)); + return false; + } + UserPatrolDetailEntity detailEntity = buildDetailEntity(recordResult, detailResult); + + userPatrolDetailDao.insert(detailEntity); + break; + default: + log.info("======"); + + } + return true; + } + + @NotNull + private UserPatrolDetailEntity buildDetailEntity(MidPatrolRecordResult recordResult, Result> detailResult) { + UserPatrolDetailEntity detailEntity = new UserPatrolDetailEntity(); + detailEntity.setCustomerId(recordResult.getCustomerId()); + detailEntity.setStaffPatrolRecId(recordResult.getId()); + StringBuilder sb =new StringBuilder(); + detailResult.getData().forEach(o->{ + sb.append(o.getLongitude()) + .append(StrConstant.COMMA) + .append(o.getLatitude()) + .append(StrConstant.SEMICOLON); + }); + detailEntity.setRoute(sb.toString()); + detailEntity.setId(recordResult.getId()); + detailEntity.setRevision(recordResult.getRevision()); + detailEntity.setCreatedBy(recordResult.getCreatedBy()); + detailEntity.setCreatedTime(recordResult.getCreatedTime()); + detailEntity.setUpdatedBy(recordResult.getUpdatedBy()); + detailEntity.setUpdatedTime(recordResult.getUpdatedTime()); + detailEntity.setDelFlag(String.valueOf(recordResult.getDelFlag())); + return detailEntity; + } + + private UserPatrolRecordEntity buildEntity(MidPatrolRecordResult recordResult) { + UserPatrolRecordEntity entity = new UserPatrolRecordEntity(); + entity.setCustomerId(recordResult.getCustomerId()); + entity.setGrid(recordResult.getGrid()); + entity.setGridPids(recordResult.getGridPids()); + entity.setStaffId(recordResult.getStaffId()); + entity.setAgencyId(recordResult.getAgencyId()); + entity.setPatrolStartTime(recordResult.getPatrolStartTime()); + entity.setPatrolEndTime(recordResult.getPatrolEndTime()); + entity.setActrualEndTime(recordResult.getActrualEndTime()); + entity.setStartLocation(""); + entity.setEndLocation(""); + entity.setTotalTime(recordResult.getTotalTime()); + entity.setDistance(0); + entity.setStatus(recordResult.getStatus()); + entity.setId(recordResult.getId()); + entity.setRevision(recordResult.getRevision()); + entity.setCreatedBy(recordResult.getCreatedBy()); + entity.setCreatedTime(recordResult.getCreatedTime()); + entity.setUpdatedBy(recordResult.getUpdatedBy()); + entity.setUpdatedTime(recordResult.getUpdatedTime()); + entity.setDelFlag(String.valueOf(recordResult.getDelFlag())); + return entity; + } + + @NotNull + private MidPatrolFormDTO buildParam(UpsertPatrolRecordForm patrolRecordForm) { + MidPatrolFormDTO midPatrolFormDTO = new MidPatrolFormDTO(); + midPatrolFormDTO.setCustomerId(patrolRecordForm.getCustomerId()); + midPatrolFormDTO.setPatrolId(patrolRecordForm.getPatrolId()); + midPatrolFormDTO.setPageNo(0); + midPatrolFormDTO.setPageSize(1); + return midPatrolFormDTO; } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml index 59302e27eb..49db95eb6d 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml @@ -8,7 +8,7 @@ spring: main: allow-bean-definition-overriding: true application: - name: open-data-server + name: open-data-worker-server #环境 dev|test|prod profiles: active: @spring.profiles.active@ diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/mapper/StaffPatrolDetailDao.xml b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/mapper/UserPatrolDetailDao.xml similarity index 92% rename from epmet-module/open-data-worker/open-data-worker-server/src/main/resources/mapper/StaffPatrolDetailDao.xml rename to epmet-module/open-data-worker/open-data-worker-server/src/main/resources/mapper/UserPatrolDetailDao.xml index bc66036982..bd61a74027 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/mapper/StaffPatrolDetailDao.xml +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/mapper/UserPatrolDetailDao.xml @@ -1,7 +1,7 @@ - + diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java index d21ea39357..b6c91d276d 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java @@ -1,13 +1,17 @@ package com.epmet.mq.listener; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dto.CustomerHomeDTO; import com.epmet.service.CustomerHomeService; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -15,7 +19,6 @@ import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.TimeUnit; @@ -28,8 +31,15 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + try { msgs.forEach(msg -> consumeMessage(msg)); } catch (Exception e) { @@ -41,6 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -66,6 +77,28 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent } finally { distributedLock.unLock(lock); } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【开放数据事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); } /* @Override diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java index 33c0ff7c63..8996de4f47 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java @@ -1,9 +1,13 @@ package com.epmet.mq; import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.modules.group.service.StatsAchievementService; import lombok.extern.slf4j.Slf4j; @@ -31,8 +35,15 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren private Logger logger = LoggerFactory.getLogger(getClass()); + private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + long start = System.currentTimeMillis(); try { msgs.forEach(this::consumeMessage); @@ -48,6 +59,7 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren private void consumeMessage(MessageExt messageExt) { logger.info("receive msg:{}", JSON.toJSONString(messageExt)); String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); GroupAchievementMQMsg msgObj = JSON.parseObject(msg, GroupAchievementMQMsg.class); if (msgObj == null){ @@ -87,9 +99,29 @@ public class GroupAchievementCustomListener implements MessageListenerConcurren distributedLock.unLock(lock); } } - } + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【小组成就事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + logger.info("【小组成就事件监听器】删除mq滞留消息缓存失败,pendingMsgLabel:{}", pendingMsgLabel); + } /*@Override public ConsumerConfigProperties getConsumerProperty() {