From 92c29b03af6e8e9451454119f430c0477de39022 Mon Sep 17 00:00:00 2001 From: yinzuomei <576302893@qq.com> Date: Fri, 15 Oct 2021 17:08:28 +0800 Subject: [PATCH 1/5] =?UTF-8?q?closeCaseTime=E6=94=B9=E4=B8=BADate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../form/ScreenProjectDataInfoFormDTO.java | 3 ++- .../impl/ScreenProjectDataServiceImpl.java | 27 ++++++++----------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/form/ScreenProjectDataInfoFormDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/form/ScreenProjectDataInfoFormDTO.java index f0f6422e94..b977e08472 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/form/ScreenProjectDataInfoFormDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/form/ScreenProjectDataInfoFormDTO.java @@ -104,7 +104,8 @@ public class ScreenProjectDataInfoFormDTO implements Serializable { /** * 结案日期 */ - private String closeCaseTime; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date closeCaseTime; /** * 所有上级ID,用英文逗号分开 diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java index 855e1a6a29..10678a2fbc 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java @@ -44,7 +44,6 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.math.BigDecimal; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; @@ -136,22 +135,18 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl Date: Fri, 15 Oct 2021 17:44:18 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E4=B8=8A=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dto/basereport/form/EventInfoFormDTO.java | 4 + .../dto/screen/ScreenProjectDataDTO.java | 4 - .../screen/ScreenProjectDataEntity.java | 9 -- .../impl/DataReportingServiceImpl.java | 1 + .../BaseDisputeProcessController.java | 56 +++++++++ .../opendata/mq/RocketMQConsumerRegister.java | 7 ++ .../OpenDataProjectChangeEventListener.java | 108 ++++++++++++++++++ .../service/BaseDisputeProcessService.java | 69 ++--------- .../impl/BaseDisputeProcessServiceImpl.java | 87 +++++--------- 9 files changed, 214 insertions(+), 131 deletions(-) create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java index f901f58270..dbbbe4bbf0 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java @@ -14,4 +14,8 @@ public class EventInfoFormDTO implements Serializable { private static final long serialVersionUID = 8479649048108914555L; private String customerId; private String projectId; + /** + * 操作类型【新增:add 修改删除:edit】 + */ + private String type; } diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java index cf8282a98b..180ac24fe5 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java @@ -196,9 +196,5 @@ public class ScreenProjectDataDTO implements Serializable { * fact_origin_project_main_daily.grid_id对应的网格名称 */ private String tempGridName; - /** - * 分类编码,存在多个,以英文逗号隔开,注意与all_cagtegory_name顺序一致; - */ - private String categoryCode; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java index 9625df2ef3..b53feaf6f6 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java @@ -160,13 +160,4 @@ public class ScreenProjectDataEntity extends BaseEpmetEntity { */ private BigDecimal satisfactionScore; - /** - * 来源:议题issue 项目立项:agency 事件:resi_event; - */ - private String origin; - - /** - * 分类编码,存在多个,以英文逗号隔开,注意与all_cagtegory_name顺序一致; - */ - private String categoryCode; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java index c1895ff7c9..249eb11c81 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java @@ -186,6 +186,7 @@ public class DataReportingServiceImpl implements DataReportingService { private BaseDisputeProcessDTO getBaseDisputeProcessDTO(ScreenProjectDataDTO project) { BaseDisputeProcessDTO dto = new BaseDisputeProcessDTO(); + dto.setId(project.getProjectId()); dto.setCustomerId(project.getCustomerId()); dto.setEventName(project.getProjectTitle()); String categoryCode = project.getCategoryCode().split(StrConstant.COMMA)[0]; diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java new file mode 100644 index 0000000000..038756ea9c --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java @@ -0,0 +1,56 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.opendata.controller; + +import com.epmet.commons.tools.utils.Result; +import com.epmet.dto.basereport.form.EventInfoFormDTO; +import com.epmet.opendata.service.BaseDisputeProcessService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + + +/** + * 事件信息表 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-15 + */ +@RestController +@RequestMapping("basedisputeprocess") +public class BaseDisputeProcessController { + + @Autowired + private BaseDisputeProcessService baseDisputeProcessService; + + /** + * 获取上报事件 + * @Param formDTO + * @Return {@link Result} + * @Author zhaoqifeng + * @Date 2021/10/15 16:59 + */ + @PostMapping("eventinfo") + public Result getEventinfo(@RequestBody(required = false) EventInfoFormDTO formDTO) { + baseDisputeProcessService.getEventinfo(formDTO); + return new Result(); + } + +} \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java index 0a966910bd..2ffed8c3dd 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -6,6 +6,7 @@ import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQConsumerProperties; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener; +import com.epmet.opendata.mq.listener.OpenDataProjectChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; @@ -41,6 +42,12 @@ public class RocketMQConsumerRegister extends MQAbstractRegister { TopicConstants.PATROL, "*", new OpenDataPatrolChangeEventListener()); + register(consumerProperties, + ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, + MessageModel.CLUSTERING, + TopicConstants.PROJECT_CHANGED, + "*", + new OpenDataProjectChangeEventListener()); // ...其他监听器类似 } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java new file mode 100644 index 0000000000..819c1975ce --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java @@ -0,0 +1,108 @@ +package com.epmet.opendata.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.dto.basereport.form.EventInfoFormDTO; +import com.epmet.feign.EpmetMessageOpenFeignClient; +import com.epmet.opendata.service.BaseDisputeProcessService; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author wxz + * @Description 系统对接中间库,项目信息变更监听器 + * @date 2021.10.13 15:21:48 + */ +public class OpenDataProjectChangeEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private EpmetMessageOpenFeignClient messageOpenFeignClient; + + private RedisUtils redisUtils; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); + String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + + //messageExt.propert + + logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); + EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()), + 30L, 30L, TimeUnit.SECONDS); + SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + distributedLock.unLock(lock); + } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-project-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @description 应答mq消息 + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java index 874996c00b..20d2516dab 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java @@ -18,13 +18,9 @@ package com.epmet.opendata.service; import com.epmet.commons.mybatis.service.BaseService; -import com.epmet.commons.tools.page.PageData; -import com.epmet.opendata.dto.BaseDisputeProcessDTO; +import com.epmet.dto.basereport.form.EventInfoFormDTO; import com.epmet.opendata.entity.BaseDisputeProcessEntity; -import java.util.List; -import java.util.Map; - /** * 事件信息表 * @@ -34,62 +30,11 @@ import java.util.Map; public interface BaseDisputeProcessService extends BaseService { /** - * 默认分页 - * - * @param params - * @return PageData - * @author generator - * @date 2021-10-15 - */ - PageData page(Map params); - - /** - * 默认查询 - * - * @param params - * @return java.util.List - * @author generator - * @date 2021-10-15 - */ - List list(Map params); - - /** - * 单条查询 - * - * @param id - * @return BaseDisputeProcessDTO - * @author generator - * @date 2021-10-15 - */ - BaseDisputeProcessDTO get(String id); - - /** - * 默认保存 - * - * @param dto - * @return void - * @author generator - * @date 2021-10-15 - */ - void save(BaseDisputeProcessDTO dto); - - /** - * 默认更新 - * - * @param dto - * @return void - * @author generator - * @date 2021-10-15 - */ - void update(BaseDisputeProcessDTO dto); - - /** - * 批量删除 - * - * @param ids - * @return void - * @author generator - * @date 2021-10-15 + * 获取上报事件 + * @Param formDTO + * @Return + * @Author zhaoqifeng + * @Date 2021/10/15 16:59 */ - void delete(String[] ids); + void getEventinfo(EventInfoFormDTO formDTO); } \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java index ade833cb71..0a0d18a9f0 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java @@ -17,23 +17,21 @@ package com.epmet.opendata.service.impl; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.core.metadata.IPage; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; -import com.epmet.commons.tools.constant.FieldConstant; -import com.epmet.commons.tools.page.PageData; +import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.ConvertUtils; +import com.epmet.commons.tools.utils.Result; +import com.epmet.dto.basereport.form.EventInfoFormDTO; +import com.epmet.feign.DataStatisticalOpenFeignClient; import com.epmet.opendata.dao.BaseDisputeProcessDao; import com.epmet.opendata.dto.BaseDisputeProcessDTO; import com.epmet.opendata.entity.BaseDisputeProcessEntity; import com.epmet.opendata.service.BaseDisputeProcessService; -import org.apache.commons.lang3.StringUtils; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.util.Arrays; +import javax.annotation.Resource; import java.util.List; -import java.util.Map; /** * 事件信息表 @@ -43,57 +41,34 @@ import java.util.Map; */ @Service public class BaseDisputeProcessServiceImpl extends BaseServiceImpl implements BaseDisputeProcessService { + @Resource + private DataStatisticalOpenFeignClient dataStatisticalOpenFeignClient; + /** + * 获取上报事件 + * + * @param formDTO + * @Param formDTO + * @Return + * @Author zhaoqifeng + * @Date 2021/10/15 16:59 + */ @Override - public PageData page(Map params) { - IPage page = baseDao.selectPage( - getPage(params, FieldConstant.CREATED_TIME, false), - getWrapper(params) - ); - return getPageData(page, BaseDisputeProcessDTO.class); - } - - @Override - public List list(Map params) { - List entityList = baseDao.selectList(getWrapper(params)); - - return ConvertUtils.sourceToTarget(entityList, BaseDisputeProcessDTO.class); - } + public void getEventinfo(EventInfoFormDTO formDTO) { + Result> result = dataStatisticalOpenFeignClient.getEventInfo(formDTO); + if (!result.success()) { + throw new RenException(result.getInternalMsg()); + } + List list = result.getData(); + if (CollectionUtils.isNotEmpty(list)) { + List entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class); + if("add".equals(formDTO.getType())){ + insertBatch(entityList); + }else { + updateBatchById(entityList); + } + } - private QueryWrapper getWrapper(Map params){ - String id = (String)params.get(FieldConstant.ID_HUMP); - - QueryWrapper wrapper = new QueryWrapper<>(); - wrapper.eq(StringUtils.isNotBlank(id), FieldConstant.ID, id); - - return wrapper; - } - - @Override - public BaseDisputeProcessDTO get(String id) { - BaseDisputeProcessEntity entity = baseDao.selectById(id); - return ConvertUtils.sourceToTarget(entity, BaseDisputeProcessDTO.class); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void save(BaseDisputeProcessDTO dto) { - BaseDisputeProcessEntity entity = ConvertUtils.sourceToTarget(dto, BaseDisputeProcessEntity.class); - insert(entity); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void update(BaseDisputeProcessDTO dto) { - BaseDisputeProcessEntity entity = ConvertUtils.sourceToTarget(dto, BaseDisputeProcessEntity.class); - updateById(entity); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void delete(String[] ids) { - // 逻辑删除(@TableLogic 注解) - baseDao.deleteBatchIds(Arrays.asList(ids)); } } \ No newline at end of file From a1e3fbbbf1624f5a7ef8b9997ff2f47915118ade Mon Sep 17 00:00:00 2001 From: sunyuchao Date: Fri, 15 Oct 2021 17:51:04 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/messages/OrgOrStaffMQMsg.java | 28 ++++++++++++++++ .../java/com/epmet/send/SendMqMsgUtil.java | 32 ++++++++++++++++--- 2 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/OrgOrStaffMQMsg.java diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/OrgOrStaffMQMsg.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/OrgOrStaffMQMsg.java new file mode 100644 index 0000000000..bbe4f551db --- /dev/null +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/OrgOrStaffMQMsg.java @@ -0,0 +1,28 @@ +package com.epmet.commons.rocketmq.messages; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; +import java.util.List; + +/** + * 组织、网格、人员中间库数据上报MQ + * @author sun + */ +@Data +@AllArgsConstructor +public class OrgOrStaffMQMsg implements Serializable { + + //客户Id + private String customerId; + //组织、网格、人员Id + private String orgId; + //数据类型【组织:agency 网格:grid 人员:staff】 + private String orgType; + //操作类型【新增:add 修改删除:edit】 + private String type; + + +} diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java index 32674d49f6..b7ce5cdd19 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/send/SendMqMsgUtil.java @@ -1,10 +1,7 @@ package com.epmet.send; import com.alibaba.fastjson.JSON; -import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; -import com.epmet.commons.rocketmq.messages.PointRuleChangedMQMsg; -import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg; -import com.epmet.commons.rocketmq.messages.StaffPatrolMQMsg; +import com.epmet.commons.rocketmq.messages.*; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.utils.Result; import com.epmet.constant.SystemMessageType; @@ -158,4 +155,31 @@ public class SendMqMsgUtil { } + /** + * @Description 组织、网格、人员中间库信息同步Mq + * @author sun + */ + public boolean sendOrgStaffMqMsg(OrgOrStaffMQMsg msg) { + try { + SystemMsgFormDTO msgForm = new SystemMsgFormDTO(); + msgForm.setMessageType(msg.getType()); + msgForm.setContent(msg); + Result sendMsgResult; + log.info("sendOrgStaffMqMsg param:{}",msgForm); + int retryTime = 0; + do { + sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(msgForm); + } while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO); + + if (sendMsgResult != null && sendMsgResult.success()) { + return true; + } + log.error("发送(组织、网格、人员同步中间库)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(msgForm)); + } catch (Exception e) { + log.error("sendOrgStaffMqMsg exception", e); + } + return false; + + } + } From 5e9a88d9f624e5136879c4983f074cf5457ceee5 Mon Sep 17 00:00:00 2001 From: jianjun Date: Fri, 15 Oct 2021 18:35:07 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=88=86=E9=A1=B5?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commons/tools/dto/form/PageFormDTO.java | 5 ++++ .../main/resources/mapper/user/UserDao.xml | 24 ++++++++++++------- .../impl/UserPatrolRecordServiceImpl.java | 2 ++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java index 4820f4d50b..c699557bac 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/dto/form/PageFormDTO.java @@ -30,6 +30,11 @@ public class PageFormDTO { */ private Integer offset; + /** + * 是否分页 默认分页 + */ + private boolean isPage = true; + public Integer getOffset() { return (pageNo-1)*pageSize; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml index 6778c0305c..30abc86c9c 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml +++ b/epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/user/UserDao.xml @@ -1011,12 +1011,15 @@ FROM staff_patrol_record WHERE 1=1 - - AND ID = #{patrolId} - + + AND ID = #{patrolId} + and CUSTOMER_ID = #{customerId} AND DEL_FLAG = '0' - LIMIT #{offset},#{pageSize} + + LIMIT #{offset},#{pageSize} + + order by created_time diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java index fdf0c2bc18..3eaf06633c 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/UserPatrolRecordServiceImpl.java @@ -118,6 +118,8 @@ public class UserPatrolRecordServiceImpl extends BaseServiceImpl> detailResult = dataStatisticalOpenFeignClient.getPatrolDetailList(midPatrolFormDTO); if (detailResult == null || !detailResult.success()) { log.error("获取巡查记录明细失败,param:{}", JSON.toJSONString(midPatrolFormDTO)); From d3f10217feaacc26d2b410e60ddff0f1d833dd6a Mon Sep 17 00:00:00 2001 From: wxz Date: Sat, 16 Oct 2021 22:24:04 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=201.epmet-messa?= =?UTF-8?q?ge=E4=B8=AD=EF=BC=8C=E6=89=AB=E6=8F=8F"=E6=9C=AA=E6=88=90?= =?UTF-8?q?=E5=8A=9F=E5=8F=91=E9=80=81=E5=88=B0mq=E7=9A=84=E6=BB=9E?= =?UTF-8?q?=E7=95=99=E6=B6=88=E6=81=AF"=E7=9A=84=E6=8E=A5=E5=8F=A3=202.?= =?UTF-8?q?=E8=B0=83=E6=95=B4mq=E9=98=BB=E5=A1=9E=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E7=9A=84key=203.=E5=85=B6=E4=BB=96=E9=80=BB=E8=BE=91=E5=BE=AE?= =?UTF-8?q?=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/AuthOperationLogListener.java | 8 +- .../listener/PointOperationLogListener.java | 8 +- .../listener/ProjectOperationLogListener.java | 8 +- .../rocketmq/constants/MQUserPropertys.java | 4 +- .../epmet/commons/tools/redis/RedisKeys.java | 8 +- .../mq/ProjectChangedCustomListener.java | 6 +- .../controller/SystemMessageController.java | 24 ++++- .../epmet/dao/SystemMessagePenddingDao.java | 35 +++++++ .../entity/SystemMessagePenddingEntity.java | 61 ++++++++++++ .../epmet/service/SystemMessageService.java | 16 ++- .../impl/SystemMessageServiceImpl.java | 97 +++++++++++++++---- .../mapper/SystemMessagePenddingDao.xml | 26 +++++ .../IssueProjectCategoryTagInitListener.java | 8 +- .../InitCustomerOrgRolesListener.java | 8 +- .../OpenDataOrgChangeEventListener.java | 8 +- .../OpenDataPatrolChangeEventListener.java | 7 +- .../OpenDataStaffChangeEventListener.java | 8 +- .../InitCustomerComponentsListener.java | 8 +- 18 files changed, 282 insertions(+), 66 deletions(-) create mode 100644 epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java create mode 100644 epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java create mode 100644 epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java index ee243d88dd..6f8afd1418 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/AuthOperationLogListener.java @@ -58,7 +58,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("认证操作日志监听器-收到消息内容:{}", msg); LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class); @@ -101,7 +101,7 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【登录操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -115,8 +115,8 @@ public class AuthOperationLogListener implements MessageListenerConcurrently { * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); + //logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java index d883b7b1f6..aa4f50a768 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/PointOperationLogListener.java @@ -58,7 +58,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently { private void consumeMessage(MessageExt messageExt) { String opeType = messageExt.getTags(); String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("积分操作日志监听器-收到消息内容:{}", msg); PointRuleChangedMQMsg msgObj = JSON.parseObject(msg, PointRuleChangedMQMsg.class); @@ -103,7 +103,7 @@ public class PointOperationLogListener implements MessageListenerConcurrently { try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【积分操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【积分操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -117,8 +117,8 @@ public class PointOperationLogListener implements MessageListenerConcurrently { * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); + //logger.info("【积分操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); } } diff --git a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java index d968d95448..a47d55c2de 100644 --- a/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java +++ b/epmet-admin/epmet-admin-server/src/main/java/com/epmet/mq/listener/listener/ProjectOperationLogListener.java @@ -58,7 +58,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { //String tags = messageExt.getTags(); String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("项目变动操作日志监听器-收到消息内容:{}", msg); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); @@ -103,7 +103,7 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【项目操作事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【项目操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -136,8 +136,8 @@ public class ProjectOperationLogListener implements MessageListenerConcurrently * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); + //logger.info("【项目操作事件监听器】删除pendingMsgLabel成功{}", pendingMsgLabel); } } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java index a69de64cca..c1a53a29ba 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/MQUserPropertys.java @@ -7,7 +7,7 @@ package com.epmet.commons.rocketmq.constants; */ public interface MQUserPropertys { - //堆积消息label - String PENDING_MSG_LABEL = "pendingMsgLabel"; + //阻塞消息label + String BLOCKED_MSG_LABEL = "blockedMsgLabel"; } diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index ad66110c51..388927ce5a 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -549,14 +549,14 @@ public class RedisKeys { } /** - * @description 检查message MQ滞留消息 + * @description 检查message MQ阻塞消息 * - * @param pendingMsgLabel 滞留消息的label + * @param blockedMsgLabel 滞留消息的label * @return * @author wxz * @date 2021.10.14 14:33:53 */ - public static String pendingMqMsgKey(String pendingMsgLabel) { - return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel); + public static String blockedMqMsgKey(String blockedMsgLabel) { + return rootPrefix.concat("message:mq:blocked:").concat(blockedMsgLabel); } } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java index dd23989ca7..1792654765 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java @@ -70,7 +70,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently private void consumeMessage(MessageExt msgExt) { String msg = new String(msgExt.getBody()); - String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = msgExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("receive customerId:{}", JSON.toJSONString(msg)); ProjectChangedMQMsg msgObj = JSON.parseObject(msg, ProjectChangedMQMsg.class); @@ -179,9 +179,9 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【项目变动事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【项目变动事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); } /*@Override diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java index cc353dd449..8c1ea726cd 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java @@ -28,10 +28,32 @@ public class SystemMessageController { @PostMapping("send-by-mq") public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class); - systemMessageService.sendMQMessage(form.getMessageType(), form.getContent()); + systemMessageService.persistAndSendMQMessage(form.getMessageType(), form.getContent()); return new Result(); } + /** + * @description 检查MQ阻塞消息(MQ收到消息,但是往监听者发送消息或者监听者处理逻辑问题导致无法消费消息) + * + * @param + * @return + * @author wxz + * @date 2021.10.16 22:07:38 + */ + @PostMapping("blocked-mq-msg-scan") + public Result blockedMqMsgScan() { + systemMessageService.blockedMqMsgScan(); + return new Result(); + } + + /** + * @description 检查MQ滞留消息(往MQ发送消息失败,MQ未收到消息) + * + * @param + * @return + * @author wxz + * @date 2021.10.16 22:08:32 + */ @PostMapping("pendding-mq-msg-scan") public Result penddingMqMsgScan() { systemMessageService.penddingMqMsgScan(); diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java new file mode 100644 index 0000000000..57ba0d3afb --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/dao/SystemMessagePenddingDao.java @@ -0,0 +1,35 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.dao; + +import com.epmet.commons.mybatis.dao.BaseDao; +import com.epmet.entity.SystemMessagePenddingEntity; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +/** + * 系统消息表 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-16 + */ +@Mapper +public interface SystemMessagePenddingDao extends BaseDao { + + void physicalDeleteById(@Param("penddingId") String penddingId); +} \ No newline at end of file diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java new file mode 100644 index 0000000000..87a7956504 --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/entity/SystemMessagePenddingEntity.java @@ -0,0 +1,61 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.entity; + +import com.baomidou.mybatisplus.annotation.TableName; + +import com.epmet.commons.mybatis.entity.BaseEpmetEntity; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Date; + +/** + * 系统消息表 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-16 + */ +@Data +@EqualsAndHashCode(callSuper=false) +@TableName("system_message_pendding") +public class SystemMessagePenddingEntity extends BaseEpmetEntity { + + private static final long serialVersionUID = 1L; + + /** + * 消息类型。init_customer:客户初始化,login登录,logout退出 + */ + private String msgType; + + /** + * 消息主表id + */ + private String msgId; + + /** + * 消息发送途径 + */ + private String sendApproach; + + /** + * 消息内容 + */ + private String content; + +} diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java index f48e56cf9f..64e34527d1 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java @@ -3,7 +3,7 @@ package com.epmet.service; public interface SystemMessageService { /** - * @description 发送mq消息 + * @description 持久化和发送mq消息 * * @param messageType * @param content @@ -11,15 +11,25 @@ public interface SystemMessageService { * @author wxz * @date 2021.10.14 15:07:02 */ - void sendMQMessage(String messageType, Object content); + void persistAndSendMQMessage(String messageType, Object content); /** - * @description 扫描滞留消息 + * @description 扫描阻塞的消息 * * @param * @return * @author wxz * @date 2021.10.15 10:13:37 */ + void blockedMqMsgScan(); + + /** + * @description 扫描待办的消息(因为发送到MQ失败而堆积) + * + * @param + * @return + * @author wxz + * @date 2021.10.16 12:11:39 + */ void penddingMqMsgScan(); } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index 7bbc426d9e..dfd6737b57 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -12,7 +12,9 @@ import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageType; import com.epmet.dao.SystemMessageDao; +import com.epmet.dao.SystemMessagePenddingDao; import com.epmet.entity.SystemMessageEntity; +import com.epmet.entity.SystemMessagePenddingEntity; import com.epmet.service.SystemMessageService; import lombok.AllArgsConstructor; import lombok.Data; @@ -27,6 +29,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; +import java.util.List; import java.util.Set; import java.util.UUID; @@ -53,51 +56,90 @@ public class SystemMessageServiceImpl implements SystemMessageService { @Autowired private SystemMessageDao systemMessageDao; + @Autowired + private SystemMessagePenddingDao systemMessagePenddingDao; + @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private RedisUtils redisUtils; - @Transactional(rollbackFor = Exception.class) @Override - public void sendMQMessage(String messageType, Object content) { + public void persistAndSendMQMessage(String messageType, Object content) { String contentStr = JSON.toJSONString(content); - // 1.存储消息到表 - SystemMessageEntity systemMessageEntity = new SystemMessageEntity(); - systemMessageEntity.setMsgType(messageType); - systemMessageEntity.setSendApproach(SystemMessageSendApproach.MQ); - systemMessageEntity.setContent(contentStr); - systemMessageDao.insert(systemMessageEntity); + logger.info("【发送MQ系统消息】-落盘并发送-messageType:{}, contentStr:{}", messageType, contentStr); + // 1.消息落盘 + String penddingMsgId = persistMessage(messageType, contentStr); + + // 2.发送消息 + sendMQMessage(messageType, contentStr, penddingMsgId); + } + + /** + * @description 消息落盘,有事务 + * + * @param messageType + * @param contentStr + * @return + * @author wxz + * @date 2021.10.16 11:04:53 + */ + @Transactional(rollbackFor = Exception.class) + public String persistMessage(String messageType, String contentStr) { + + SystemMessageEntity message = new SystemMessageEntity(); + message.setMsgType(messageType); + message.setSendApproach(SystemMessageSendApproach.MQ); + message.setContent(contentStr); + systemMessageDao.insert(message); + + SystemMessagePenddingEntity pendding = new SystemMessagePenddingEntity(); + pendding.setMsgType(messageType); + pendding.setSendApproach(SystemMessageSendApproach.MQ); + pendding.setContent(contentStr); + pendding.setMsgId(message.getId()); + systemMessagePenddingDao.insert(pendding); + + logger.info("【发送MQ系统消息】-落盘完成"); + return pendding.getId(); + } + + private void sendMQMessage(String messageType, String contentStr, String penddingId) { String topic = getTopicByMsgType(messageType); - // 2.缓存下来,供滞留消息扫描。TTL -1,永不过期 + // 缓存下来,供滞留消息扫描。TTL -1,永不过期 String pendingMsgLabel = null; String pendingMsgKey = null; try { MessageCacheBean mcb = new MessageCacheBean(LocalDateTime.now(), messageType, topic, contentStr); pendingMsgLabel = UUID.randomUUID().toString().replace("-", ""); - pendingMsgKey = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + pendingMsgKey = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.set(pendingMsgKey, mcb, -1); + //logger.info("【发送MQ系统消息】-存入redis堆积列表成功-{}-{}-{}", topic, messageType, pendingMsgLabel); } catch (Exception e) { - logger.error("将系统MQ消息存储到Redis滞留列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); + logger.error("【发送MQ系统消息】将系统MQ消息存储到Redis堆积列表失败,业务继续执行,{}", ExceptionUtils.getThrowableErrorStackTrace(e)); } // 3.发送mq消息 try { Message meMessage = new Message(topic, messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); - meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel); + meMessage.putUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL, pendingMsgLabel); rocketMQTemplate.getProducer().send(meMessage); + logger.info("【发送MQ系统消息】-发送到MQ成功"); } catch (Exception e) { String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); - logger.error("发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); + logger.error("【发送MQ系统消息】发送系统MQ消息失败,堆栈信息:{}", errorStackTrace); - // 清理消息缓存 + // 清理阻塞中的消息缓存 redisUtils.delete(pendingMsgKey); throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode()); } + + // 删除消息堆积 + systemMessagePenddingDao.physicalDeleteById(penddingId); } /** @@ -146,8 +188,8 @@ public class SystemMessageServiceImpl implements SystemMessageService { } @Override - public void penddingMqMsgScan() { - String scanKey = RedisKeys.pendingMqMsgKey("*"); + public void blockedMqMsgScan() { + String scanKey = RedisKeys.blockedMqMsgKey("*"); Set keys = redisUtils.keys(scanKey); //System.out.println(keys); for (String key : keys) { @@ -173,7 +215,7 @@ public class SystemMessageServiceImpl implements SystemMessageService { && l1LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) ) { - logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size()); l1LastAlertTime = now; } @@ -185,7 +227,7 @@ public class SystemMessageServiceImpl implements SystemMessageService { createTime.plusSeconds(PENDDING_MQ_MSG_EXEC_THRESHOLD_DELTA_L6).isBefore(now) && l6LastAlertTime.plusSeconds(PENDDING_MQ_MSG_ALERT_SECONDS_DELTA).isBefore(now)) ) { - logger.error("【MQ堆积消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生堆积", mcb.topic, mcb.messageType, key, keys.size()); + logger.error("【MQ阻塞消息扫描】Topic:{},messageType:{},redisKey:{},等{}个消息发生阻塞", mcb.topic, mcb.messageType, key, keys.size()); l1LastAlertTime = now; } break; @@ -193,6 +235,25 @@ public class SystemMessageServiceImpl implements SystemMessageService { } } + @Override + public void penddingMqMsgScan() { + Integer count = systemMessagePenddingDao.selectCount(null); + if (count == 0) { + return; + } + + // 扫描并且重新投递 + List penddingMsgs = systemMessagePenddingDao.selectList(null); + for (SystemMessagePenddingEntity penddingMsg : penddingMsgs) { + try { + sendMQMessage(penddingMsg.getMsgType(), penddingMsg.getContent(), penddingMsg.getId()); + } catch (Exception e) { + // 投递失败不应影响后续消息的投递 + logger.error("【重新投递MQ消息】失败, msgType:{}, penddingMsgId:{}, 错误:{}", penddingMsg.getMsgType(), penddingMsg.getId() , ExceptionUtils.getErrorStackTrace(e)); + } + } + } + @Data @NoArgsConstructor @AllArgsConstructor diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml b/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml new file mode 100644 index 0000000000..4a5ab00608 --- /dev/null +++ b/epmet-module/epmet-message/epmet-message-server/src/main/resources/mapper/SystemMessagePenddingDao.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + delete from system_message_pendding where ID = #{penddingId} + + + + \ No newline at end of file diff --git a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java index cb7af65139..a9eb9c436f 100644 --- a/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java +++ b/epmet-module/gov-issue/gov-issue-server/src/main/java/com/epmet/mq/listener/IssueProjectCategoryTagInitListener.java @@ -57,7 +57,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu public void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("初始化客户-初始化议题、项目的分类、标签数据-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -84,7 +84,7 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【议题/项目分类标签初始化事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【议题/项目分类标签初始化事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -98,8 +98,8 @@ public class IssueProjectCategoryTagInitListener implements MessageListenerConcu * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【议题/项目分类标签初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【议题/项目分类标签初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); } } \ No newline at end of file diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java index 31346a2e8b..11bcc80e38 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/InitCustomerOrgRolesListener.java @@ -56,7 +56,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("初始化客户-初始化组织信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -82,7 +82,7 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【客户初始化事件监听器】-orgRole-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【客户初始化事件监听器】-orgRole-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -129,9 +129,9 @@ public class InitCustomerOrgRolesListener implements MessageListenerConcurrently * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【客户初始化事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【客户初始化事件监听器】删除mq阻塞消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); } /* @Override diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java index 5a13adb637..df6278488e 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -55,7 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent String msg = new String(messageExt.getBody()); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); GridBaseInfoFormDTO obj = JSON.parseObject(msg, GridBaseInfoFormDTO.class); @@ -87,7 +87,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -101,8 +101,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,pendingMsgLabel:{}", pendingMsgLabel); + //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java index a9797b34d8..4e40f0974f 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java @@ -58,7 +58,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr String msg = new String(messageExt.getBody()); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags); StaffPatrolMQMsg msgObj = JSON.parseObject(msg, StaffPatrolMQMsg.class); @@ -93,7 +93,7 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-巡查-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-巡查-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -107,7 +107,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); + //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); } } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java index 270c7ff3e3..313c036cb8 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -58,11 +58,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre String msg = new String(messageExt.getBody()); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); //messageExt.propert - logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); + logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel); StaffBaseInfoFormDTO obj = JSON.parseObject(msg, StaffBaseInfoFormDTO.class); DistributedLock distributedLock = null; @@ -87,7 +87,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-staff-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-staff-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -102,7 +102,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); } } diff --git a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java index d560979694..5980ad0a2a 100644 --- a/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java +++ b/epmet-module/oper-customize/oper-customize-server/src/main/java/com/epmet/mq/listener/InitCustomerComponentsListener.java @@ -51,7 +51,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent private void consumeMessage(MessageExt messageExt) { String msg = new String(messageExt.getBody()); - String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); @@ -82,7 +82,7 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent try { removePendingMqMsgCache(pendingMsgLabel); } catch (Exception e) { - logger.error("【开放数据事件监听器】-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); } } } @@ -96,9 +96,9 @@ public class InitCustomerComponentsListener implements MessageListenerConcurrent * @date 2021.10.14 16:32:32 */ private void removePendingMqMsgCache(String pendingMsgLabel) { - String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); redisUtils.delete(key); - logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,penddingMsgLabel:{}", pendingMsgLabel); + //logger.info("【开放数据事件监听器】删除mq滞留消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); } /* @Override