From 5e177952b6f5cfb9a49a75a3d1af7051a388e496 Mon Sep 17 00:00:00 2001 From: wangxianzhang Date: Thu, 30 Jun 2022 10:31:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=E3=80=90=E5=B1=85?= =?UTF-8?q?=E6=B0=91=E4=BF=A1=E6=81=AF=E5=AF=BC=E5=85=A5=E3=80=91=E5=85=9A?= =?UTF-8?q?=E5=91=98=E4=BF=A1=E6=81=AF=E5=90=8C=E6=AD=A5=E5=88=B0partymemb?= =?UTF-8?q?er=E5=BA=93=E3=80=82=E7=9B=B8=E5=85=B3MQ=E5=8F=8A=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E6=94=B9=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/ConsomerGroupConstants.java | 5 + .../rocketmq/constants/TopicConstants.java | 5 + .../messages/PartymemberSyncMQMsg.java | 60 ++++++++++ .../com/epmet/constant/SystemMessageType.java | 5 + .../impl/SystemMessageServiceImpl.java | 3 + .../resi-partymember-server/pom.xml | 23 ++++ .../epmet/mq/RocketMQConsumerRegister.java | 19 ++++ .../listener/ResiPartyMemberSyncListener.java | 106 ++++++++++++++++++ .../src/main/resources/bootstrap.yml | 7 +- .../IcResiImportDynamicExcelListener.java | 3 + .../impl/IcResiUserImportServiceImpl.java | 87 ++++++++++++-- .../service/impl/IcResiUserServiceImpl.java | 48 +++++--- 12 files changed, 341 insertions(+), 30 deletions(-) create mode 100644 epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartymemberSyncMQMsg.java create mode 100644 epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java create mode 100644 epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/ResiPartyMemberSyncListener.java diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index 1a4b303c3c..2aadd34cb4 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -93,4 +93,9 @@ public interface ConsomerGroupConstants { * 志愿者变更 */ String VOLUNTEER_CHANGE_EVENT_LISTENER_GROUP = "volunteer_change_event_listener_group"; + + /** + * 创建党员居民信息消费者组,将user库的党员信息同步到partymember库的党员表。 + */ + String CREATE_RESI_PARTYMEMBER_SYNC_GROUP = "create_resi_sync_group"; } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index 66d04ff05f..cea27769bc 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -79,4 +79,9 @@ public interface TopicConstants { * 志愿者 */ String VOLUNTEER ="volunteer"; + + /** + * 居民的党员信息 + */ + String PARTYMEMBER_RESI = "partymember_resi"; } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartymemberSyncMQMsg.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartymemberSyncMQMsg.java new file mode 100644 index 0000000000..d9010854e0 --- /dev/null +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartymemberSyncMQMsg.java @@ -0,0 +1,60 @@ +package com.epmet.commons.rocketmq.messages; + +import com.epmet.commons.tools.dto.form.mq.MqBaseFormDTO; +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; + +/** + * user库党员信息同步到partymember库的mq消息 + */ +@Data +public class PartymemberSyncMQMsg extends MqBaseFormDTO { + + /** + * 党员列表,允许一次传输多个党员信息 + */ + private List partymemberList = new ArrayList<>(); + + @Data + public static class PartyMemberSyncForm { + + private String customerId; + private String agencyId; + private String agencyPids; + private String icResiUser; + private String name; + private String idCard; + private String mobile; + private String address; + private String rdsj; + private String sszb; + /** + * 是否流动党员 + */ + private String isLd; + + /** + * 流动党员活动证号 + */ + private String ldzh; + /** + * 职务 + */ + private String partyZw; + /** + * 是否退休 + */ + private String isTx; + /** + * 是否党员中心户 + */ + private String isDyzxh; + /** + * 志愿者类型,逗号隔开 + */ + private String volunteerCategory; + + } +} diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java index 13f409ca36..b60b3b8fe1 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java @@ -165,4 +165,9 @@ public interface SystemMessageType { */ String FINISH_USER_DEMAND="finish_user_demand"; + /** + * 党员身份的居民信息导入 + */ + String PARTYMEMBER_RESI_IMPORT = "partymember_resi_import"; + } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index 3e210d13a0..d43fc223dc 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -234,6 +234,9 @@ public class SystemMessageServiceImpl implements SystemMessageService { case SystemMessageType.VOLUNTEER_CHANGED: topic=TopicConstants.VOLUNTEER; break; + case SystemMessageType.PARTYMEMBER_RESI_IMPORT: + topic=TopicConstants.PARTYMEMBER_RESI; + break; default: logger.error("getTopicByMsgType msgType:{} is not support for any topic", msgType); } diff --git a/epmet-module/resi-partymember/resi-partymember-server/pom.xml b/epmet-module/resi-partymember/resi-partymember-server/pom.xml index a1f564cf47..18eab5096b 100644 --- a/epmet-module/resi-partymember/resi-partymember-server/pom.xml +++ b/epmet-module/resi-partymember/resi-partymember-server/pom.xml @@ -116,6 +116,13 @@ 2.0.0 compile + + + + com.epmet + epmet-commons-rocketmq + 2.0.0 + @@ -197,6 +204,10 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.1.140:9876;192.168.1.141:9876 @@ -247,6 +258,10 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + false + 192.168.1.140:9876;192.168.1.141:9876 @@ -297,6 +312,10 @@ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.10.161:9876 @@ -346,6 +365,10 @@ SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + + true + 192.168.11.187:9876;192.168.11.184:9876 diff --git a/epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..67ceeb7651 --- /dev/null +++ b/epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -0,0 +1,19 @@ +package com.epmet.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.commons.rocketmq.register.MQAbstractRegister; +import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.mq.listener.ResiPartyMemberSyncListener; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.stereotype.Component; + +@Component +public class RocketMQConsumerRegister extends MQAbstractRegister { + + @Override + public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { + register(consumerProperties, ConsomerGroupConstants.CREATE_RESI_PARTYMEMBER_SYNC_GROUP, MessageModel.CLUSTERING, + TopicConstants.PARTYMEMBER_RESI, "*", new ResiPartyMemberSyncListener()); + } +} diff --git a/epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/ResiPartyMemberSyncListener.java b/epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/ResiPartyMemberSyncListener.java new file mode 100644 index 0000000000..4304f94f95 --- /dev/null +++ b/epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/ResiPartyMemberSyncListener.java @@ -0,0 +1,106 @@ +package com.epmet.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.PartymemberSyncMQMsg; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.ConvertUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.modules.partymember.service.IcPartyMemberService; +import com.epmet.resi.partymember.dto.partymember.IcPartyMemberDTO; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @author wxz + * @Description 创建党员居民信息消费者组,将user库的党员信息同步到partymember库的党员表。 + + * @return + * @date 2021.06.07 16:12 + */ +public class ResiPartyMemberSyncListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private RedisUtils redisUtils; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + /** + * 逐条消费 + * @param messageExt + */ + private void consumeMessage(MessageExt messageExt) { + //String tags = messageExt.getTags(); + String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); + logger.info("居民-党员信息同步监听器-收到消息内容:{}", msg); + PartymemberSyncMQMsg msgObj = JSON.parseObject(msg, PartymemberSyncMQMsg.class); + + List partymemberList = msgObj.getPartymemberList(); + if (CollectionUtils.isEmpty(partymemberList)) { + return; + } + partymemberList.stream().forEach(p -> { + IcPartyMemberDTO icPartyMemberDTO = ConvertUtils.sourceToTarget(p, IcPartyMemberDTO.class); + try { + SpringContextUtils.getBean(IcPartyMemberService.class).icPartyMemberSync(icPartyMemberDTO); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【居民-党员信息同步】同步失败,居民id:{},错误信息:{}", p.getIcResiUser(), ExceptionUtils.getErrorStackTrace(e)); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【居民-党员信息同步】同步失败居民id:{},错误信息:{}", p.getIcResiUser(), ExceptionUtils.getErrorStackTrace(e)); + throw e; + } + }); + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【居民-党员信息同步】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + + logger.info("【居民-党员信息同步】处理完成,条数:{}", partymemberList.size()); + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); + Boolean rst = redisUtils.delete(key); + } +} diff --git a/epmet-module/resi-partymember/resi-partymember-server/src/main/resources/bootstrap.yml b/epmet-module/resi-partymember/resi-partymember-server/src/main/resources/bootstrap.yml index ee185fa822..751620e91d 100644 --- a/epmet-module/resi-partymember/resi-partymember-server/src/main/resources/bootstrap.yml +++ b/epmet-module/resi-partymember/resi-partymember-server/src/main/resources/bootstrap.yml @@ -142,4 +142,9 @@ thread: queueCapacity: @thread.threadPool.queue-capacity@ keepAliveSeconds: @thread.threadPool.keep-alive-seconds@ threadNamePrefix: @thread.threadPool.thread-name-prefix@ - rejectedExecutionHandler: @thread.threadPool.rejected-execution-handler@ \ No newline at end of file + rejectedExecutionHandler: @thread.threadPool.rejected-execution-handler@ + +# rocketmq +rocketmq: + enable: @rocketmq.enable@ + name-server: @rocketmq.nameserver@ \ No newline at end of file diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcResiImportDynamicExcelListener.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcResiImportDynamicExcelListener.java index 2e98774f6e..1d06fe5707 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcResiImportDynamicExcelListener.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcResiImportDynamicExcelListener.java @@ -195,6 +195,9 @@ public class IcResiImportDynamicExcelListener extends AnalysisEventListener idCards) { + String customerId = EpmetRequestHolder.getLoginUserCustomerId(); + IcResiUserServiceImpl resiService = SpringContextUtils.getBean(IcResiUserServiceImpl.class); + + PartymemberSyncMQMsg mqMsgBody = new PartymemberSyncMQMsg(); + + idCards.stream().forEach(idCard -> { + // 检查用户是否存在 + Map existResiInfoMap = icResiUserDao.selectResiInfoMap(idCard, null); + if (existResiInfoMap == null || existResiInfoMap.size() == 0) { + return ; + } + String resiId = existResiInfoMap.get("ID"); + + // 查询党员信息 + LambdaQueryWrapper partymemberQuery = new LambdaQueryWrapper().eq(IcPartyMemberEntity::getIcResiUser, resiId); + IcPartyMemberEntity partymemberEntity = icPartyMemberDao.selectOne(partymemberQuery); + + + String agencyId = existResiInfoMap.get("AGENCY_ID"); + AgencyInfoCache agencyInfo = CustomerOrgRedis.getAgencyInfo(agencyId); + if (agencyInfo == null) { + log.error("【居民信息导入】党员信息-根据组织id未找到组织信息。组织ID:{}", agencyId); + return ; + } + String houseAddress = resiService.getHouseAddress4PartymemberInfo(customerId, idCard, existResiInfoMap.get("HOME_ID")); + + PartymemberSyncMQMsg.PartyMemberSyncForm partymemberInfo = new PartymemberSyncMQMsg.PartyMemberSyncForm(); + partymemberInfo.setCustomerId(existResiInfoMap.get("CUSTOMER_ID")); + partymemberInfo.setAgencyId(agencyId); + partymemberInfo.setAgencyPids(agencyInfo.getPids()); + partymemberInfo.setIcResiUser(resiId); + partymemberInfo.setName(existResiInfoMap.get("NAME")); + partymemberInfo.setIdCard(existResiInfoMap.get("ID_CARD")); + partymemberInfo.setMobile(existResiInfoMap.get("MOBILE")); + partymemberInfo.setAddress(houseAddress); + + partymemberInfo.setRdsj(partymemberEntity.getRdsj()); + partymemberInfo.setSszb(partymemberEntity.getSszb()); + partymemberInfo.setIsLd(partymemberEntity.getIsLd()); + partymemberInfo.setLdzh(partymemberEntity.getLdzh()); + partymemberInfo.setPartyZw(partymemberEntity.getPartyZw()); + partymemberInfo.setIsTx(partymemberEntity.getIsTx()); + partymemberInfo.setIsDyzxh(partymemberEntity.getIsDyzxh()); + + // 志愿者信息 + LambdaQueryWrapper query = new LambdaQueryWrapper<>(); + query.eq(IcVolunteerEntity::getIcResiUser, resiId); + List volunteerCategories = icVolunteerDao.selectList(query).stream().map(v -> v.getVolunteerCategory()).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(volunteerCategories)) { + partymemberInfo.setVolunteerCategory(String.join(",", volunteerCategories)); + } + + mqMsgBody.getPartymemberList().add(partymemberInfo); + }); + + SendMqMsgUtil.build().openFeignClient(epmetMessageOpenFeignClient).sendRocketMqMsg(SystemMessageType.PARTYMEMBER_RESI_IMPORT, mqMsgBody); + } + /** * 去掉多余的列 * @param originColumnAndValues diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java index fb8384b4a0..5c7bae23f4 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java @@ -314,23 +314,9 @@ public class IcResiUserServiceImpl extends BaseServiceImpl