Browse Source

新增:【居民信息导入】党员信息同步到partymember库。相关MQ及服务改动

master
wangxianzhang 3 years ago
parent
commit
5e177952b6
  1. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 60
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartymemberSyncMQMsg.java
  4. 5
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
  5. 3
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  6. 23
      epmet-module/resi-partymember/resi-partymember-server/pom.xml
  7. 19
      epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
  8. 106
      epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/ResiPartyMemberSyncListener.java
  9. 7
      epmet-module/resi-partymember/resi-partymember-server/src/main/resources/bootstrap.yml
  10. 3
      epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcResiImportDynamicExcelListener.java
  11. 87
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserImportServiceImpl.java
  12. 48
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -93,4 +93,9 @@ public interface ConsomerGroupConstants {
* 志愿者变更
*/
String VOLUNTEER_CHANGE_EVENT_LISTENER_GROUP = "volunteer_change_event_listener_group";
/**
* 创建党员居民信息消费者组将user库的党员信息同步到partymember库的党员表
*/
String CREATE_RESI_PARTYMEMBER_SYNC_GROUP = "create_resi_sync_group";
}

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -79,4 +79,9 @@ public interface TopicConstants {
* 志愿者
*/
String VOLUNTEER ="volunteer";
/**
* 居民的党员信息
*/
String PARTYMEMBER_RESI = "partymember_resi";
}

60
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartymemberSyncMQMsg.java

@ -0,0 +1,60 @@
package com.epmet.commons.rocketmq.messages;
import com.epmet.commons.tools.dto.form.mq.MqBaseFormDTO;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* user库党员信息同步到partymember库的mq消息
*/
@Data
public class PartymemberSyncMQMsg extends MqBaseFormDTO {
/**
* 党员列表允许一次传输多个党员信息
*/
private List<PartyMemberSyncForm> partymemberList = new ArrayList<>();
@Data
public static class PartyMemberSyncForm {
private String customerId;
private String agencyId;
private String agencyPids;
private String icResiUser;
private String name;
private String idCard;
private String mobile;
private String address;
private String rdsj;
private String sszb;
/**
* 是否流动党员
*/
private String isLd;
/**
* 流动党员活动证号
*/
private String ldzh;
/**
* 职务
*/
private String partyZw;
/**
* 是否退休
*/
private String isTx;
/**
* 是否党员中心户
*/
private String isDyzxh;
/**
* 志愿者类型,逗号隔开
*/
private String volunteerCategory;
}
}

5
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java

@ -165,4 +165,9 @@ public interface SystemMessageType {
*/
String FINISH_USER_DEMAND="finish_user_demand";
/**
* 党员身份的居民信息导入
*/
String PARTYMEMBER_RESI_IMPORT = "partymember_resi_import";
}

3
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -234,6 +234,9 @@ public class SystemMessageServiceImpl implements SystemMessageService {
case SystemMessageType.VOLUNTEER_CHANGED:
topic=TopicConstants.VOLUNTEER;
break;
case SystemMessageType.PARTYMEMBER_RESI_IMPORT:
topic=TopicConstants.PARTYMEMBER_RESI;
break;
default:
logger.error("getTopicByMsgType msgType:{} is not support for any topic", msgType);
}

23
epmet-module/resi-partymember/resi-partymember-server/pom.xml

@ -116,6 +116,13 @@
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-commons-rocketmq</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
@ -197,6 +204,10 @@
</dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
@ -247,6 +258,10 @@
</dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>false</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
@ -297,6 +312,10 @@
</dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.10.161:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
@ -346,6 +365,10 @@
</dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1
</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.11.187:9876;192.168.11.184:9876</rocketmq.nameserver>
</properties>
</profile>
</profiles>

19
epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java

@ -0,0 +1,19 @@
package com.epmet.mq;
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.mq.listener.ResiPartyMemberSyncListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
@Component
public class RocketMQConsumerRegister extends MQAbstractRegister {
@Override
public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
register(consumerProperties, ConsomerGroupConstants.CREATE_RESI_PARTYMEMBER_SYNC_GROUP, MessageModel.CLUSTERING,
TopicConstants.PARTYMEMBER_RESI, "*", new ResiPartyMemberSyncListener());
}
}

106
epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/ResiPartyMemberSyncListener.java

@ -0,0 +1,106 @@
package com.epmet.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.PartymemberSyncMQMsg;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.modules.partymember.service.IcPartyMemberService;
import com.epmet.resi.partymember.dto.partymember.IcPartyMemberDTO;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @author wxz
* @Description 创建党员居民信息消费者组将user库的党员信息同步到partymember库的党员表
* @return
* @date 2021.06.07 16:12
*/
public class ResiPartyMemberSyncListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 逐条消费
* @param messageExt
*/
private void consumeMessage(MessageExt messageExt) {
//String tags = messageExt.getTags();
String msg = new String(messageExt.getBody());
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("居民-党员信息同步监听器-收到消息内容:{}", msg);
PartymemberSyncMQMsg msgObj = JSON.parseObject(msg, PartymemberSyncMQMsg.class);
List<PartymemberSyncMQMsg.PartyMemberSyncForm> partymemberList = msgObj.getPartymemberList();
if (CollectionUtils.isEmpty(partymemberList)) {
return;
}
partymemberList.stream().forEach(p -> {
IcPartyMemberDTO icPartyMemberDTO = ConvertUtils.sourceToTarget(p, IcPartyMemberDTO.class);
try {
SpringContextUtils.getBean(IcPartyMemberService.class).icPartyMemberSync(icPartyMemberDTO);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【居民-党员信息同步】同步失败,居民id:{},错误信息:{}", p.getIcResiUser(), ExceptionUtils.getErrorStackTrace(e));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【居民-党员信息同步】同步失败居民id:{},错误信息:{}", p.getIcResiUser(), ExceptionUtils.getErrorStackTrace(e));
throw e;
}
});
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【居民-党员信息同步】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
logger.info("【居民-党员信息同步】处理完成,条数:{}", partymemberList.size());
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
Boolean rst = redisUtils.delete(key);
}
}

7
epmet-module/resi-partymember/resi-partymember-server/src/main/resources/bootstrap.yml

@ -142,4 +142,9 @@ thread:
queueCapacity: @thread.threadPool.queue-capacity@
keepAliveSeconds: @thread.threadPool.keep-alive-seconds@
threadNamePrefix: @thread.threadPool.thread-name-prefix@
rejectedExecutionHandler: @thread.threadPool.rejected-execution-handler@
rejectedExecutionHandler: @thread.threadPool.rejected-execution-handler@
# rocketmq
rocketmq:
enable: @rocketmq.enable@
name-server: @rocketmq.nameserver@

3
epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcResiImportDynamicExcelListener.java

@ -195,6 +195,9 @@ public class IcResiImportDynamicExcelListener extends AnalysisEventListener<Map<
SendMqMsgUtil.build()
.openFeignClient(SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class))
.sendRocketMqMsg(SystemMessageType.VOLUNTEER_CHANGED, msg);
} else if (sheetEnumObject == IcResiUserTableEnum.IC_PARTY_MEMBER && !CollectionUtils.isEmpty(successIdCards)) {
// 针对党员,需要发送mq消息,同步到党员库
icResiUserImportService.syncPartyMemberInfo(successIdCards);
}
}
dataList.clear();

87
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserImportServiceImpl.java

@ -6,10 +6,12 @@ import cn.afterturn.easypoi.excel.entity.ExportParams;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.EasyExcelFactory;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.epmet.bean.ResiImportCategoryData;
import com.epmet.bean.ResiImportChangedData;
import com.epmet.bean.ResiImportResiCategoryChangedCache;
import com.epmet.commons.rocketmq.messages.PartymemberSyncMQMsg;
import com.epmet.commons.tools.constant.AppClientConstant;
import com.epmet.commons.tools.constant.ServiceConstant;
import com.epmet.commons.tools.constant.StrConstant;
@ -27,30 +29,23 @@ import com.epmet.commons.tools.redis.common.CustomerOrgRedis;
import com.epmet.commons.tools.redis.common.CustomerStaffRedis;
import com.epmet.commons.tools.redis.common.bean.AgencyInfoCache;
import com.epmet.commons.tools.redis.common.bean.GridInfoCache;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.EpmetRequestHolder;
import com.epmet.commons.tools.utils.FileUtils;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.utils.*;
import com.epmet.constant.SystemMessageType;
import com.epmet.constants.ImportTaskConstants;
import com.epmet.dao.IcResiUserDao;
import com.epmet.dao.IcUserChangeDetailedDao;
import com.epmet.dao.IcUserChangeRecordDao;
import com.epmet.dao.IcUserTransferRecordDao;
import com.epmet.dao.*;
import com.epmet.dto.*;
import com.epmet.dto.form.GridOptionFormDTO;
import com.epmet.dto.form.HouseFormDTO;
import com.epmet.dto.form.IcResiUserOrgMsgFormDTO;
import com.epmet.dto.form.ImportTaskCommonFormDTO;
import com.epmet.dto.result.*;
import com.epmet.entity.IcResiUserEntity;
import com.epmet.entity.IcUserChangeDetailedEntity;
import com.epmet.entity.IcUserChangeRecordEntity;
import com.epmet.entity.IcUserTransferRecordEntity;
import com.epmet.entity.*;
import com.epmet.enums.IcResiUserTableEnum;
import com.epmet.excel.handler.IcResiImportDynamicExcelListener;
import com.epmet.excel.handler.IcResiVirtualSheetImportListener;
import com.epmet.feign.*;
import com.epmet.resi.partymember.feign.ResiPartyMemberOpenFeignClient;
import com.epmet.send.SendMqMsgUtil;
import com.epmet.service.IcResiUserImportService;
import com.epmet.service.UserService;
import com.google.common.cache.Cache;
@ -172,6 +167,12 @@ public class IcResiUserImportServiceImpl implements IcResiUserImportService, Res
private OssFeignClient ossFeignClient;
@Autowired
private ResiPartyMemberOpenFeignClient partyMemberOpenFeignClient;
@Autowired
private IcVolunteerDao icVolunteerDao;
@Autowired
private IcPartyMemberDao icPartyMemberDao;
@Autowired
private EpmetMessageOpenFeignClient epmetMessageOpenFeignClient;
/**
* 子表中不需要的列因为主表中需要身份证号网格等信息但子表中不需要这些列必填只要有身份证号即可因此字表判断的时候需要排除这些列
@ -820,6 +821,68 @@ public class IcResiUserImportServiceImpl implements IcResiUserImportService, Res
}
}
/**
* 使用mq同步党员信息
*/
public void syncPartyMemberInfo(List<String> idCards) {
String customerId = EpmetRequestHolder.getLoginUserCustomerId();
IcResiUserServiceImpl resiService = SpringContextUtils.getBean(IcResiUserServiceImpl.class);
PartymemberSyncMQMsg mqMsgBody = new PartymemberSyncMQMsg();
idCards.stream().forEach(idCard -> {
// 检查用户是否存在
Map<String, String> existResiInfoMap = icResiUserDao.selectResiInfoMap(idCard, null);
if (existResiInfoMap == null || existResiInfoMap.size() == 0) {
return ;
}
String resiId = existResiInfoMap.get("ID");
// 查询党员信息
LambdaQueryWrapper<IcPartyMemberEntity> partymemberQuery = new LambdaQueryWrapper<IcPartyMemberEntity>().eq(IcPartyMemberEntity::getIcResiUser, resiId);
IcPartyMemberEntity partymemberEntity = icPartyMemberDao.selectOne(partymemberQuery);
String agencyId = existResiInfoMap.get("AGENCY_ID");
AgencyInfoCache agencyInfo = CustomerOrgRedis.getAgencyInfo(agencyId);
if (agencyInfo == null) {
log.error("【居民信息导入】党员信息-根据组织id未找到组织信息。组织ID:{}", agencyId);
return ;
}
String houseAddress = resiService.getHouseAddress4PartymemberInfo(customerId, idCard, existResiInfoMap.get("HOME_ID"));
PartymemberSyncMQMsg.PartyMemberSyncForm partymemberInfo = new PartymemberSyncMQMsg.PartyMemberSyncForm();
partymemberInfo.setCustomerId(existResiInfoMap.get("CUSTOMER_ID"));
partymemberInfo.setAgencyId(agencyId);
partymemberInfo.setAgencyPids(agencyInfo.getPids());
partymemberInfo.setIcResiUser(resiId);
partymemberInfo.setName(existResiInfoMap.get("NAME"));
partymemberInfo.setIdCard(existResiInfoMap.get("ID_CARD"));
partymemberInfo.setMobile(existResiInfoMap.get("MOBILE"));
partymemberInfo.setAddress(houseAddress);
partymemberInfo.setRdsj(partymemberEntity.getRdsj());
partymemberInfo.setSszb(partymemberEntity.getSszb());
partymemberInfo.setIsLd(partymemberEntity.getIsLd());
partymemberInfo.setLdzh(partymemberEntity.getLdzh());
partymemberInfo.setPartyZw(partymemberEntity.getPartyZw());
partymemberInfo.setIsTx(partymemberEntity.getIsTx());
partymemberInfo.setIsDyzxh(partymemberEntity.getIsDyzxh());
// 志愿者信息
LambdaQueryWrapper<IcVolunteerEntity> query = new LambdaQueryWrapper<>();
query.eq(IcVolunteerEntity::getIcResiUser, resiId);
List<String> volunteerCategories = icVolunteerDao.selectList(query).stream().map(v -> v.getVolunteerCategory()).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(volunteerCategories)) {
partymemberInfo.setVolunteerCategory(String.join(",", volunteerCategories));
}
mqMsgBody.getPartymemberList().add(partymemberInfo);
});
SendMqMsgUtil.build().openFeignClient(epmetMessageOpenFeignClient).sendRocketMqMsg(SystemMessageType.PARTYMEMBER_RESI_IMPORT, mqMsgBody);
}
/**
* 去掉多余的列
* @param originColumnAndValues

48
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java

@ -314,23 +314,9 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
partyMemberDTO.setName(map.get("NAME"));
partyMemberDTO.setIdCard(map.get("ID_CARD"));
partyMemberDTO.setMobile(map.get("MOBILE"));
//查询网格信息
GridInfoCache gridInfo = CustomerOrgRedis.getGridInfo(map.get("GRID_ID"));
//查询房屋信息
HouseInfoCache houseInfo = CustomerIcHouseRedis.getHouseInfo(tokenDto.getCustomerId(), map.get("HOME_ID"));
if (null != gridInfo && null != houseInfo) {
String address;
if (StringUtils.isNotBlank(gridInfo.getAllParentName())) {
address = gridInfo.getAllParentName().concat(StrConstant.HYPHEN).concat(gridInfo.getGridNamePath())
.concat(StrConstant.HYPHEN).concat(houseInfo.getNeighborHoodName())
.concat(StrConstant.HYPHEN).concat(houseInfo.getHouseName());
} else {
address = gridInfo.getGridNamePath()
.concat(StrConstant.HYPHEN).concat(houseInfo.getNeighborHoodName())
.concat(StrConstant.HYPHEN).concat(houseInfo.getHouseName());
}
partyMemberDTO.setAddress(address);
}
String houseAddress = getHouseAddress4PartymemberInfo(tokenDto.getCustomerId(), map.get("GRID_ID"), map.get("HOME_ID"));
partyMemberDTO.setAddress(houseAddress);
}
//2022.05.18 end zhaoqf
}
@ -380,6 +366,34 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
}
/**
* 为党员信息生成家庭地址
* @param customerId
* @param gridId
* @param homeId
* @return
*/
public String getHouseAddress4PartymemberInfo(String customerId, String gridId, String homeId) {
//查询网格信息
GridInfoCache gridInfo = CustomerOrgRedis.getGridInfo(gridId);
//查询房屋信息
HouseInfoCache houseInfo = CustomerIcHouseRedis.getHouseInfo(customerId, homeId);
if (null != gridInfo && null != houseInfo) {
String address;
if (StringUtils.isNotBlank(gridInfo.getAllParentName())) {
address = gridInfo.getAllParentName().concat(StrConstant.HYPHEN).concat(gridInfo.getGridNamePath())
.concat(StrConstant.HYPHEN).concat(houseInfo.getNeighborHoodName())
.concat(StrConstant.HYPHEN).concat(houseInfo.getHouseName());
} else {
address = gridInfo.getGridNamePath()
.concat(StrConstant.HYPHEN).concat(houseInfo.getNeighborHoodName())
.concat(StrConstant.HYPHEN).concat(houseInfo.getHouseName());
}
return address;
}
return null;
}
/**
* 3.变更记录表和变更记录明细表新增数据
*

Loading…
Cancel
Save