Browse Source

居民信息新增、变更推送mq

dev_shibei_match
sunyuchao 4 years ago
parent
commit
41d249672a
  1. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 20
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/IcWarnStatsMQMsg.java
  4. 10
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
  5. 4
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  6. 12
      epmet-user/epmet-user-server/pom.xml
  7. 31
      epmet-user/epmet-user-server/src/main/java/com/epmet/mq/RocketMQWarnStatsRegister.java
  8. 104
      epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java
  9. 39
      epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/InitCustomerRolesListener.java
  10. 29
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java
  11. 4
      epmet-user/epmet-user-server/src/main/resources/bootstrap.yml

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -65,4 +65,9 @@ public interface ConsomerGroupConstants {
*/ */
String OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP = "open_data_project_change_event_listener_group"; String OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP = "open_data_project_change_event_listener_group";
/**
* 开放的人员信息变更预警统计
*/
String IC_WARN_STATS_EVENT_LISTENER_GROUP = "ic_warn_stats_event_listener_group";
} }

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -48,4 +48,9 @@ public interface TopicConstants {
* 项目 * 项目
*/ */
String PROJECT = "project"; String PROJECT = "project";
/**
* 项目
*/
String IC_RESI_USER = "ic_resi_user";
} }

20
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/IcWarnStatsMQMsg.java

@ -0,0 +1,20 @@
package com.epmet.commons.rocketmq.messages;
import lombok.Data;
import java.io.Serializable;
/**
* 居民信息新增修改推送MQ
* @author sun
*/
@Data
public class IcWarnStatsMQMsg implements Serializable {
//客户Id
private String customerId;
//居民ID
private String icResiUser;
}

10
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java

@ -85,4 +85,14 @@ public interface SystemMessageType {
*/ */
String PROJECT_EDIT = "project_edit"; String PROJECT_EDIT = "project_edit";
/**
* 居民信息添加
*/
String IC_RESI_USER_ADD = "ic_resi_user_add";
/**
* 居民信息修改
*/
String IC_RESI_USER_EDIT = "ic_resi_user_edit";
} }

4
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -187,6 +187,10 @@ public class SystemMessageServiceImpl implements SystemMessageService {
case SystemMessageType.PROJECT_EDIT: case SystemMessageType.PROJECT_EDIT:
topic = TopicConstants.PROJECT; topic = TopicConstants.PROJECT;
break; break;
case SystemMessageType.IC_RESI_USER_ADD:
case SystemMessageType.IC_RESI_USER_EDIT:
topic = TopicConstants.IC_RESI_USER;
break;
} }
return topic; return topic;
} }

12
epmet-user/epmet-user-server/pom.xml

@ -222,6 +222,9 @@
</dingTalk.robot.webHook> </dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd <dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
</dingTalk.robot.secret> </dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties> </properties>
</profile> </profile>
<profile> <profile>
@ -264,6 +267,9 @@
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd <dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
</dingTalk.robot.secret> </dingTalk.robot.secret>
<openapi.scan.server.url>https://epmet-dev.elinkservice.cn/api/epmetscan/api</openapi.scan.server.url> <openapi.scan.server.url>https://epmet-dev.elinkservice.cn/api/epmetscan/api</openapi.scan.server.url>
<!--rocketmq-->
<rocketmq.enable>false</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties> </properties>
</profile> </profile>
<profile> <profile>
@ -305,6 +311,9 @@
</dingTalk.robot.webHook> </dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd <dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
</dingTalk.robot.secret> </dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties> </properties>
</profile> </profile>
<profile> <profile>
@ -345,6 +354,9 @@
</dingTalk.robot.webHook> </dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 <dingTalk.robot.secret>SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1
</dingTalk.robot.secret> </dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties> </properties>
</profile> </profile>
</profiles> </profiles>

31
epmet-user/epmet-user-server/src/main/java/com/epmet/mq/RocketMQWarnStatsRegister.java

@ -0,0 +1,31 @@
package com.epmet.mq;
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.mq.listener.ICWarnStatsEventListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
/**
* @Description 如果rocketmq.enable=true这里必须实现 实例化
* @author wxz
* @date 2021.07.14 17:13:41
*/
@Component
public class RocketMQWarnStatsRegister extends MQAbstractRegister {
@Override
public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
// 客户初始化监听器注册
register(consumerProperties,
ConsomerGroupConstants.IC_WARN_STATS_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.IC_RESI_USER,
"*",
new ICWarnStatsEventListener());
// ...其他监听器类似
}
}

104
epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java

@ -0,0 +1,104 @@
package com.epmet.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.IcWarnStatsMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Description 负能平台-客户居民信息变动监听器
* @author wxz
* @date 2021.10.13 15:21:48
*/
public class ICWarnStatsEventListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(MessageExt messageExt) {
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-居民信息变动-收到消息内容:{},操作:{}", msg, tags);
IcWarnStatsMQMsg obj = JSON.parseObject(msg, IcWarnStatsMQMsg.class);
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
System.out.println("嘻嘻哈哈哈乐乐呵呵-----------");
//待执行方法
//SpringContextUtils.getBean(BaseGridInfoService.class).getAgencyBaseInfo(obj);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
} finally {
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
//logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
}
}

39
epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/InitCustomerRolesListener.java

@ -1,39 +0,0 @@
//package com.epmet.mq.listener;
//
//import com.alibaba.fastjson.JSON;
//import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
//import com.epmet.commons.rocketmq.constants.TopicConstants;
//import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg;
//import com.epmet.service.GovStaffRoleService;
//import org.apache.rocketmq.common.message.MessageExt;
//import org.apache.rocketmq.spring.annotation.MessageModel;
//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
//import org.apache.rocketmq.spring.core.RocketMQListener;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
///**
// * 监听初始化客户动作,为客户初始化角色列表
// */
//@RocketMQMessageListener(topic = TopicConstants.INIT_CUSTOMER,
// consumerGroup = ConsomerGroupConstants.INIT_CUSTOMER_ROLES_GROUP,
// messageModel = MessageModel.CLUSTERING,
// selectorExpression = "*")
//@Component
//public class InitCustomerRolesListener implements RocketMQListener<MessageExt> {
//
// private Logger logger = LoggerFactory.getLogger(getClass());
//
// @Autowired
// private GovStaffRoleService govStaffRoleService;
//
// @Override
// public void onMessage(MessageExt messageExt) {
// String msg = new String(messageExt.getBody());
// logger.info("初始化客户-初始化角色列表-收到消息内容:{}", msg);
// InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class);
// govStaffRoleService.initGovStaffRolesForCustomer(msgObj.getCustomerId());
// }
//}

29
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java

@ -22,6 +22,7 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.rocketmq.messages.IcWarnStatsMQMsg;
import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.FieldConstant;
import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.constant.ServiceConstant; import com.epmet.commons.tools.constant.ServiceConstant;
@ -39,6 +40,7 @@ import com.epmet.commons.tools.security.dto.TokenDto;
import com.epmet.commons.tools.security.user.LoginUserUtil; import com.epmet.commons.tools.security.user.LoginUserUtil;
import com.epmet.commons.tools.utils.ConvertUtils; import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.Result; import com.epmet.commons.tools.utils.Result;
import com.epmet.constant.SystemMessageType;
import com.epmet.constant.UserConstant; import com.epmet.constant.UserConstant;
import com.epmet.dao.IcResiUserDao; import com.epmet.dao.IcResiUserDao;
import com.epmet.dto.*; import com.epmet.dto.*;
@ -46,10 +48,7 @@ import com.epmet.dto.form.*;
import com.epmet.dto.result.*; import com.epmet.dto.result.*;
import com.epmet.entity.IcResiUserEntity; import com.epmet.entity.IcResiUserEntity;
import com.epmet.excel.handler.DynamicEasyExcelListener; import com.epmet.excel.handler.DynamicEasyExcelListener;
import com.epmet.feign.EpmetAdminOpenFeignClient; import com.epmet.feign.*;
import com.epmet.feign.EpmetUserOpenFeignClient;
import com.epmet.feign.GovOrgOpenFeignClient;
import com.epmet.feign.OperCustomizeOpenFeignClient;
import com.epmet.service.IcResiUserService; import com.epmet.service.IcResiUserService;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
@ -57,8 +56,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -90,6 +87,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
@Autowired @Autowired
private LoginUserUtil loginUserUtil; private LoginUserUtil loginUserUtil;
@Autowired
private EpmetMessageOpenFeignClient epmetMessageOpenFeignClient;
@ -172,6 +171,15 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
} }
}); });
//推送MQ事件
IcWarnStatsMQMsg mqMsg = new IcWarnStatsMQMsg();
mqMsg.setCustomerId(tokenDto.getCustomerId());
mqMsg.setIcResiUser(resiUserId);
SystemMsgFormDTO form = new SystemMsgFormDTO();
form.setMessageType(SystemMessageType.IC_RESI_USER_ADD);
form.setContent(mqMsg);
epmetMessageOpenFeignClient.sendSystemMsgByMQ(form);
} }
/** /**
@ -227,6 +235,15 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
} }
}); });
//推送MQ事件
IcWarnStatsMQMsg mqMsg = new IcWarnStatsMQMsg();
mqMsg.setCustomerId(tokenDto.getCustomerId());
mqMsg.setIcResiUser(resiUserId);
SystemMsgFormDTO form = new SystemMsgFormDTO();
form.setMessageType(SystemMessageType.IC_RESI_USER_ADD);
form.setContent(mqMsg);
epmetMessageOpenFeignClient.sendSystemMsgByMQ(form);
} }
/** /**

4
epmet-user/epmet-user-server/src/main/resources/bootstrap.yml

@ -150,7 +150,9 @@ dingTalk:
secret: @dingTalk.robot.secret@ secret: @dingTalk.robot.secret@
rocketmq: rocketmq:
name-server: 192.168.1.140:9876;192.168.1.141:9876 # 是否开启mq
enable: @rocketmq.enable@
name-server: @rocketmq.nameserver@
# 停机选项 # 停机选项
shutdown: shutdown:

Loading…
Cancel
Save