diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index 89e8a70ab3..31b8e903f8 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -777,4 +777,28 @@ public class RedisKeys { } return rootPrefix.concat("lock:").concat(methodName); } + + /** + * desc:获取更新 房屋内有居民数量的key + * @param customerId + * @return + */ + public static String getUpdateHouseResiNumer(String customerId) { + if (StringUtils.isBlank(customerId)){ + throw new EpmetException(EpmetErrorCode.EPMET_COMMON_OPERATION_FAIL.getCode(),"参数错误"); + } + return rootPrefix.concat("updateHouseResiNumber:").concat(customerId); + } + + /** + * desc:获取更新 房屋内有居民数量 线程的key + * @param customerId + * @return + */ + public static String updateIcHouseResiNumberThread(String customerId) { + if (StringUtils.isBlank(customerId)){ + throw new EpmetException(EpmetErrorCode.EPMET_COMMON_OPERATION_FAIL.getCode(),"参数错误"); + } + return rootPrefix.concat("updateHouseResiNumber:thread:").concat(customerId); + } } diff --git a/epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java b/epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java new file mode 100644 index 0000000000..4c974bd10b --- /dev/null +++ b/epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java @@ -0,0 +1,46 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.dto.result; + +import lombok.Data; + +import java.io.Serializable; + + +/** + * 房屋内居民总数结果 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-26 + */ +@Data +public class HouseIcResiUserCountResultDTO implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 所属家庭Id + */ + private String homeId; + + /** + * 房间内总人数 + */ + private Integer resiNumber; + +} diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java index f145fadd97..8b201da55d 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java @@ -363,11 +363,18 @@ public interface IcResiUserDao extends BaseDao { @Param("customerId") String customerId); /** - * @describe: 通过身份证号查询实体类 - * @author wangtong - * @date 2022/6/13 17:32 - * @params [idCard, customerId] - * @return com.epmet.entity.IcResiUserEntity - */ - IcResiUserEntity selectResiUserEntityByIdCard(@Param("idCard") String idCard,@Param("customerId") String customerId); + * @return com.epmet.entity.IcResiUserEntity + * @describe: 通过身份证号查询实体类 + * @author wangtong + * @date 2022/6/13 17:32 + * @params [idCard, customerId] + */ + IcResiUserEntity selectResiUserEntityByIdCard(@Param("idCard") String idCard, @Param("customerId") String customerId); + + /** + * desc:获取客户内 每个房屋内的居民数 + * @param customerId + * @return + */ + List getResiUserCountGroupHomeId(@Param("customerId") String customerId); } diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java index ee9c4e47b3..4ea3eb92b5 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java @@ -11,6 +11,7 @@ import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.constant.SystemMessageType; import com.epmet.service.IcNatService; +import com.epmet.service.IcResiUserService; import com.epmet.service.StatsResiWarnService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -26,10 +27,10 @@ import java.util.List; import java.util.concurrent.TimeUnit; /** - * @Description 负能平台-客户居民信息变动监听器 * @author wxz + * @Description 负能平台-居民信息变动监听器 * @date 2021.10.13 15:21:48 -*/ + */ @Slf4j public class ICWarnStatsEventListener implements MessageListenerConcurrently { @@ -37,6 +38,7 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently { private RedisUtils redisUtils; + @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -57,34 +59,25 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently { // msg即为消息体 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 String msg = new String(messageExt.getBody()); - log.info("msg is {}",msg); + log.info("msg is {}", msg); String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); logger.info("【开放数据事件监听器】-居民信息变动-收到消息内容:{},操作:{}", msg, tags); IcResiUserAddMQMsg obj = JSON.parseObject(msg, IcResiUserAddMQMsg.class); - log.info("obj is {}",JSON.toJSONString(obj)); - DistributedLock distributedLock = null; - RLock lock = null; + log.info("obj is {}", JSON.toJSONString(obj)); try { - distributedLock = SpringContextUtils.getBean(DistributedLock.class); - lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()), - 30L, 30L, TimeUnit.SECONDS); - //待执行方法 - SpringContextUtils.getBean(StatsResiWarnService.class).resiWarn(obj.getCustomerId()); - //根据居民Id更新 核酸检测的是否居民状态 - updateIcNatResiFlag(tags, obj); + if (!resiWarnStats(obj)||!updateIcNatResiFlag(tags, obj)||!updateIcHouseResiNumber(obj)){ + } } catch (RenException e) { // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 - logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + logger.error("【开放数据事件监听器】-居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); } catch (Exception e) { // 不是我们自己抛出的异常,可以让MQ重试 - logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + logger.error("【开放数据事件监听器】-居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); throw e; - } finally { - distributedLock.unLock(lock); } if (StringUtils.isNotBlank(pendingMsgLabel)) { @@ -96,31 +89,97 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently { } } + private boolean resiWarnStats(IcResiUserAddMQMsg obj) { + DistributedLock distributedLock = null; + RLock lock = null; + try { + if (StringUtils.isBlank(obj.getCustomerId())) { + log.warn("resiWarnStats param is error"); + return true; + } + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()), + 30L, 30L, TimeUnit.SECONDS); + SpringContextUtils.getBean(StatsResiWarnService.class).resiWarn(obj.getCustomerId()); + log.debug("【居民信息变动】resiWarnStats param customerId:{}", JSON.toJSONString(obj)); + + } catch (Exception e) { + log.error("resiWarnStats exception", e); + return false; + } finally { + if (distributedLock != null){ + distributedLock.unLock(lock); + } + } + return true; + } + + /** + * desc:根据客户id更新 每个房屋内居民的总数 + * @param obj + * @return + * @remark 因为使用了线程 所以结果不用关心 + */ + private boolean updateIcHouseResiNumber(IcResiUserAddMQMsg obj) { + DistributedLock distributedLock = null; + RLock lock = null; + try { + if (StringUtils.isBlank(obj.getCustomerId())) { + log.warn("updateIcHouseResiNumber param is error"); + return true; + } + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:update_house_resi_number:%s", obj.getCustomerId()), + 30L, 30L, TimeUnit.SECONDS); + SpringContextUtils.getBean(IcResiUserService.class).updateIcHouseResiNumber(obj.getCustomerId()); + log.debug("【居民信息变动】updateIcHouseResiNumber param customerId & icResiUserId:{}", JSON.toJSONString(obj)); + } catch (Exception e) { + log.error("updateIcHouseResiNumber exception", e); + return false; + } finally { + if (distributedLock != null){ + distributedLock.unLock(lock); + } + } + return true; + } + /** * desc:根据居民Id更新 核酸检测的是否居民状态(只有新增和删除时调用) - * @param tags + * @param tags * @param obj + * @return */ - private void updateIcNatResiFlag(String tags, IcResiUserAddMQMsg obj) { + private boolean updateIcNatResiFlag(String tags, IcResiUserAddMQMsg obj) { + DistributedLock distributedLock = null; + RLock lock = null; try { - if (StringUtils.isBlank(obj.getCustomerId())||StringUtils.isBlank(obj.getIcResiUser())){ + if (StringUtils.isBlank(obj.getCustomerId()) || StringUtils.isBlank(obj.getIcResiUser())) { log.warn("updateIcNatResiFlag param is error"); - return; + return true; } + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:ic_nat_resi_flag:%s", obj.getCustomerId()), + 30L, 30L, TimeUnit.SECONDS); if (SystemMessageType.IC_RESI_USER_ADD.equals(tags) || SystemMessageType.IC_RESI_USER_DEL.equals(tags)) { Integer effectRow = SpringContextUtils.getBean(IcNatService.class).updateIsResiFlag(obj.getCustomerId(), obj.getIcResiUser()); - log.debug("updateIcNatResiFlag effectRow:{},param customerId & icResiUserId:{}", effectRow, JSON.toJSONString(obj)); + log.debug("【居民信息变动】updateIcNatResiFlag effectRow:{},param customerId & icResiUserId:{}", effectRow, JSON.toJSONString(obj)); } } catch (Exception e) { log.error("updateIcNatResiFlag exception", e); + return false; + } finally { + if (distributedLock != null){ + distributedLock.unLock(lock); + } } + return true; } /** - * @description - * * @param pendingMsgLabel * @return + * @description * @author wxz * @date 2021.10.14 16:32:32 */ diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java index e79dc5fc27..699d98b2fb 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java @@ -335,7 +335,7 @@ public interface IcResiUserService extends BaseService { SyncResiResDTO checkUser(String customerId, String idCard, String agencyId); /** - * desc:条件获取居民基础信息 + * desc:条件获取居民基础信息 按照房屋分组 * * @param formDTO * @return @@ -445,4 +445,11 @@ public interface IcResiUserService extends BaseService { * @return */ List queryVolunteerList(String customerId, String userId); + + /** + * desc:根据客户Id 更新房屋的 居民人数 + * @param customerId + * @return + */ + void updateIcHouseResiNumber(String customerId); } diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java index a90c766b8f..51abe39977 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java @@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.epmet.bean.ResiExportBaseInfoData; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; import com.epmet.commons.tools.constant.*; +import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.dto.form.DictListFormDTO; import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult; import com.epmet.commons.tools.dto.result.DictListResultDTO; @@ -56,12 +57,12 @@ import com.epmet.dto.form.*; import com.epmet.dto.form.demand.UserDemandNameQueryFormDTO; import com.epmet.dto.result.*; import com.epmet.dto.result.demand.IcResiDemandDictDTO; +import com.epmet.dto.result.demand.OptionDTO; import com.epmet.entity.*; import com.epmet.excel.support.ExportResiUserItemDTO; import com.epmet.feign.*; import com.epmet.resi.partymember.feign.ResiPartyMemberOpenFeignClient; import com.epmet.service.*; -import com.epmet.dto.result.demand.OptionDTO; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -85,6 +86,8 @@ import java.sql.Timestamp; import java.text.NumberFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -129,10 +132,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl getWrapper(Map params) { String id = (String) params.get(FieldConstant.ID_HUMP); @@ -2576,6 +2579,41 @@ public class IcResiUserServiceImpl extends BaseServiceImpl o = (Future) redisUtils.get(threadKey); + if (o != null){ + //如果是自己的 则取消就行了 + //o.cancel(true); + log.info("updateIcHouseResiNumber 任务取消"); + } + }*/ + + Future submit = executorService.submit(() -> { + int pageNo = 1; + int pageSize = NumConstant.TEN_THOUSAND; + Page houseReisCountListPage = null; + String key = RedisKeys.getUpdateHouseResiNumer(customerId); + redisUtils.delete(key); + do { + houseReisCountListPage = PageHelper.startPage(pageNo++, pageSize, false) + .doSelectPage(() -> baseDao.getResiUserCountGroupHomeId(customerId)); + System.out.println(houseReisCountListPage.getResult().size()); + List result = houseReisCountListPage.getResult(); + for (HouseIcResiUserCountResultDTO o: result){ + redisUtils.leftPush(key,o); + } + + } while (houseReisCountListPage.getResult().size() == pageSize); + log.info("updateIcHouseResiNumber 执行完毕"); + //distributedLock.unLock(lock); + }); + redisUtils.set(threadKey,submit); + } + /** * 根据身份证获取居民角色(目前只有是否是志愿者) * diff --git a/epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml b/epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml index edb2455bee..84fc723e61 100644 --- a/epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml +++ b/epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml @@ -1132,4 +1132,7 @@ AND customer_id = #{customerId} AND DEL_FLAG = '0' +