diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index fb5ae07412..93894c0bdb 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -65,4 +65,9 @@ public interface ConsomerGroupConstants { */ String OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP = "open_data_project_change_event_listener_group"; + /** + * 开放的人员信息变更预警统计 + */ + String IC_WARN_STATS_EVENT_LISTENER_GROUP = "ic_warn_stats_event_listener_group"; + } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index 3f0c3066ec..13217da550 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -48,4 +48,9 @@ public interface TopicConstants { * 项目 */ String PROJECT = "project"; + + /** + * 项目 + */ + String IC_RESI_USER = "ic_resi_user"; } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/IcWarnStatsMQMsg.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/IcWarnStatsMQMsg.java new file mode 100644 index 0000000000..b78863ef4c --- /dev/null +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/IcWarnStatsMQMsg.java @@ -0,0 +1,20 @@ +package com.epmet.commons.rocketmq.messages; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 居民信息新增、修改推送MQ + * @author sun + */ +@Data +public class IcWarnStatsMQMsg implements Serializable { + + //客户Id + private String customerId; + //居民ID + private String icResiUser; + + +} diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java index 84580a3b4a..4536d6c330 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java @@ -85,4 +85,14 @@ public interface SystemMessageType { */ String PROJECT_EDIT = "project_edit"; + /** + * 居民信息添加 + */ + String IC_RESI_USER_ADD = "ic_resi_user_add"; + + /** + * 居民信息修改 + */ + String IC_RESI_USER_EDIT = "ic_resi_user_edit"; + } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index 61373708df..ee4990405f 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -187,6 +187,10 @@ public class SystemMessageServiceImpl implements SystemMessageService { case SystemMessageType.PROJECT_EDIT: topic = TopicConstants.PROJECT; break; + case SystemMessageType.IC_RESI_USER_ADD: + case SystemMessageType.IC_RESI_USER_EDIT: + topic = TopicConstants.IC_RESI_USER; + break; } return topic; } diff --git a/epmet-user/epmet-user-server/pom.xml b/epmet-user/epmet-user-server/pom.xml index 1b0eafe16d..790d22ecf2 100644 --- a/epmet-user/epmet-user-server/pom.xml +++ b/epmet-user/epmet-user-server/pom.xml @@ -222,6 +222,9 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + true + 192.168.1.140:9876;192.168.1.141:9876 @@ -264,6 +267,9 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd https://epmet-dev.elinkservice.cn/api/epmetscan/api + + false + 192.168.1.140:9876;192.168.1.141:9876 @@ -305,6 +311,9 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + true + 192.168.1.140:9876;192.168.1.141:9876 @@ -345,6 +354,9 @@ SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + true + 192.168.1.140:9876;192.168.1.141:9876 diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/RocketMQWarnStatsRegister.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/RocketMQWarnStatsRegister.java new file mode 100644 index 0000000000..2e7068ae04 --- /dev/null +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/RocketMQWarnStatsRegister.java @@ -0,0 +1,31 @@ +package com.epmet.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.commons.rocketmq.register.MQAbstractRegister; +import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.mq.listener.ICWarnStatsEventListener; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.stereotype.Component; + +/** + * @Description 如果rocketmq.enable=true,这里必须实现,且 实例化 + * @author wxz + * @date 2021.07.14 17:13:41 +*/ +@Component +public class RocketMQWarnStatsRegister extends MQAbstractRegister { + + @Override + public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { + // 客户初始化监听器注册 + register(consumerProperties, + ConsomerGroupConstants.IC_WARN_STATS_EVENT_LISTENER_GROUP, + MessageModel.CLUSTERING, + TopicConstants.IC_RESI_USER, + "*", + new ICWarnStatsEventListener()); + + // ...其他监听器类似 + } +} diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java new file mode 100644 index 0000000000..3a16ec247e --- /dev/null +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java @@ -0,0 +1,104 @@ +package com.epmet.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.IcWarnStatsMQMsg; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @Description 负能平台-客户居民信息变动监听器 + * @author wxz + * @date 2021.10.13 15:21:48 +*/ +public class ICWarnStatsEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private RedisUtils redisUtils; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); + String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); + + logger.info("【开放数据事件监听器】-居民信息变动-收到消息内容:{},操作:{}", msg, tags); + IcWarnStatsMQMsg obj = JSON.parseObject(msg, IcWarnStatsMQMsg.class); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()), + 30L, 30L, TimeUnit.SECONDS); + System.out.println("嘻嘻哈哈哈乐乐呵呵-----------"); + //待执行方法 + //SpringContextUtils.getBean(BaseGridInfoService.class).getAgencyBaseInfo(obj); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + distributedLock.unLock(lock); + } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); + } +} diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/InitCustomerRolesListener.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/InitCustomerRolesListener.java deleted file mode 100644 index a50da7561e..0000000000 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/InitCustomerRolesListener.java +++ /dev/null @@ -1,39 +0,0 @@ -//package com.epmet.mq.listener; -// -//import com.alibaba.fastjson.JSON; -//import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; -//import com.epmet.commons.rocketmq.constants.TopicConstants; -//import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; -//import com.epmet.service.GovStaffRoleService; -//import org.apache.rocketmq.common.message.MessageExt; -//import org.apache.rocketmq.spring.annotation.MessageModel; -//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -//import org.apache.rocketmq.spring.core.RocketMQListener; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.stereotype.Component; -// -///** -// * 监听初始化客户动作,为客户初始化角色列表 -// */ -//@RocketMQMessageListener(topic = TopicConstants.INIT_CUSTOMER, -// consumerGroup = ConsomerGroupConstants.INIT_CUSTOMER_ROLES_GROUP, -// messageModel = MessageModel.CLUSTERING, -// selectorExpression = "*") -//@Component -//public class InitCustomerRolesListener implements RocketMQListener { -// -// private Logger logger = LoggerFactory.getLogger(getClass()); -// -// @Autowired -// private GovStaffRoleService govStaffRoleService; -// -// @Override -// public void onMessage(MessageExt messageExt) { -// String msg = new String(messageExt.getBody()); -// logger.info("初始化客户-初始化角色列表-收到消息内容:{}", msg); -// InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); -// govStaffRoleService.initGovStaffRolesForCustomer(msgObj.getCustomerId()); -// } -//} diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java index 9b76ac18f4..991fac2c6d 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java @@ -22,6 +22,7 @@ import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.rocketmq.messages.IcWarnStatsMQMsg; import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.constant.ServiceConstant; @@ -39,6 +40,7 @@ import com.epmet.commons.tools.security.dto.TokenDto; import com.epmet.commons.tools.security.user.LoginUserUtil; import com.epmet.commons.tools.utils.ConvertUtils; import com.epmet.commons.tools.utils.Result; +import com.epmet.constant.SystemMessageType; import com.epmet.constant.UserConstant; import com.epmet.dao.IcResiUserDao; import com.epmet.dto.*; @@ -46,10 +48,7 @@ import com.epmet.dto.form.*; import com.epmet.dto.result.*; import com.epmet.entity.IcResiUserEntity; import com.epmet.excel.handler.DynamicEasyExcelListener; -import com.epmet.feign.EpmetAdminOpenFeignClient; -import com.epmet.feign.EpmetUserOpenFeignClient; -import com.epmet.feign.GovOrgOpenFeignClient; -import com.epmet.feign.OperCustomizeOpenFeignClient; +import com.epmet.feign.*; import com.epmet.service.IcResiUserService; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -57,8 +56,6 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -90,6 +87,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl