Browse Source

房屋内居民 总数统计

dev
jianjun 3 years ago
parent
commit
45170c05ff
  1. 24
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 46
      epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java
  3. 11
      epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java
  4. 103
      epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java
  5. 9
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java
  6. 48
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java
  7. 3
      epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml

24
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -777,4 +777,28 @@ public class RedisKeys {
} }
return rootPrefix.concat("lock:").concat(methodName); return rootPrefix.concat("lock:").concat(methodName);
} }
/**
* desc:获取更新 房屋内有居民数量的key
* @param customerId
* @return
*/
public static String getUpdateHouseResiNumer(String customerId) {
if (StringUtils.isBlank(customerId)){
throw new EpmetException(EpmetErrorCode.EPMET_COMMON_OPERATION_FAIL.getCode(),"参数错误");
}
return rootPrefix.concat("updateHouseResiNumber:").concat(customerId);
}
/**
* desc:获取更新 房屋内有居民数量 线程的key
* @param customerId
* @return
*/
public static String updateIcHouseResiNumberThread(String customerId) {
if (StringUtils.isBlank(customerId)){
throw new EpmetException(EpmetErrorCode.EPMET_COMMON_OPERATION_FAIL.getCode(),"参数错误");
}
return rootPrefix.concat("updateHouseResiNumber:thread:").concat(customerId);
}
} }

46
epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java

@ -0,0 +1,46 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.dto.result;
import lombok.Data;
import java.io.Serializable;
/**
* 房屋内居民总数结果
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-26
*/
@Data
public class HouseIcResiUserCountResultDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 所属家庭Id
*/
private String homeId;
/**
* 房间内总人数
*/
private Integer resiNumber;
}

11
epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java

@ -363,11 +363,18 @@ public interface IcResiUserDao extends BaseDao<IcResiUserEntity> {
@Param("customerId") String customerId); @Param("customerId") String customerId);
/** /**
* @return com.epmet.entity.IcResiUserEntity
* @describe: 通过身份证号查询实体类 * @describe: 通过身份证号查询实体类
* @author wangtong * @author wangtong
* @date 2022/6/13 17:32 * @date 2022/6/13 17:32
* @params [idCard, customerId] * @params [idCard, customerId]
* @return com.epmet.entity.IcResiUserEntity
*/ */
IcResiUserEntity selectResiUserEntityByIdCard(@Param("idCard") String idCard,@Param("customerId") String customerId); IcResiUserEntity selectResiUserEntityByIdCard(@Param("idCard") String idCard, @Param("customerId") String customerId);
/**
* desc:获取客户内 每个房屋内的居民数
* @param customerId
* @return
*/
List<HouseIcResiUserCountResultDTO> getResiUserCountGroupHomeId(@Param("customerId") String customerId);
} }

103
epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java

@ -11,6 +11,7 @@ import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.SystemMessageType; import com.epmet.constant.SystemMessageType;
import com.epmet.service.IcNatService; import com.epmet.service.IcNatService;
import com.epmet.service.IcResiUserService;
import com.epmet.service.StatsResiWarnService; import com.epmet.service.StatsResiWarnService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -26,10 +27,10 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* @Description 负能平台-客户居民信息变动监听器
* @author wxz * @author wxz
* @Description 负能平台-居民信息变动监听器
* @date 2021.10.13 15:21:48 * @date 2021.10.13 15:21:48
*/ */
@Slf4j @Slf4j
public class ICWarnStatsEventListener implements MessageListenerConcurrently { public class ICWarnStatsEventListener implements MessageListenerConcurrently {
@ -37,6 +38,7 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently {
private RedisUtils redisUtils; private RedisUtils redisUtils;
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -57,70 +59,127 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently {
// msg即为消息体 // msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
log.info("msg is {}",msg); log.info("msg is {}", msg);
String topic = messageExt.getTopic(); String topic = messageExt.getTopic();
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【开放数据事件监听器】-居民信息变动-收到消息内容:{},操作:{}", msg, tags); logger.info("【开放数据事件监听器】-居民信息变动-收到消息内容:{},操作:{}", msg, tags);
IcResiUserAddMQMsg obj = JSON.parseObject(msg, IcResiUserAddMQMsg.class); IcResiUserAddMQMsg obj = JSON.parseObject(msg, IcResiUserAddMQMsg.class);
log.info("obj is {}",JSON.toJSONString(obj)); log.info("obj is {}", JSON.toJSONString(obj));
try {
if (!resiWarnStats(obj)||!updateIcNatResiFlag(tags, obj)||!updateIcHouseResiNumber(obj)){
}
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
private boolean resiWarnStats(IcResiUserAddMQMsg obj) {
DistributedLock distributedLock = null; DistributedLock distributedLock = null;
RLock lock = null; RLock lock = null;
try { try {
if (StringUtils.isBlank(obj.getCustomerId())) {
log.warn("resiWarnStats param is error");
return true;
}
distributedLock = SpringContextUtils.getBean(DistributedLock.class); distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()), lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS); 30L, 30L, TimeUnit.SECONDS);
//待执行方法
SpringContextUtils.getBean(StatsResiWarnService.class).resiWarn(obj.getCustomerId()); SpringContextUtils.getBean(StatsResiWarnService.class).resiWarn(obj.getCustomerId());
//根据居民Id更新 核酸检测的是否居民状态 log.debug("【居民信息变动】resiWarnStats param customerId:{}", JSON.toJSONString(obj));
updateIcNatResiFlag(tags, obj);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) { } catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试 log.error("resiWarnStats exception", e);
logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); return false;
throw e;
} finally { } finally {
if (distributedLock != null){
distributedLock.unLock(lock); distributedLock.unLock(lock);
} }
}
return true;
}
if (StringUtils.isNotBlank(pendingMsgLabel)) { /**
* desc:根据客户id更新 每个房屋内居民的总数
* @param obj
* @return
* @remark 因为使用了线程 所以结果不用关心
*/
private boolean updateIcHouseResiNumber(IcResiUserAddMQMsg obj) {
DistributedLock distributedLock = null;
RLock lock = null;
try { try {
removePendingMqMsgCache(pendingMsgLabel); if (StringUtils.isBlank(obj.getCustomerId())) {
log.warn("updateIcHouseResiNumber param is error");
return true;
}
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:update_house_resi_number:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
SpringContextUtils.getBean(IcResiUserService.class).updateIcHouseResiNumber(obj.getCustomerId());
log.debug("【居民信息变动】updateIcHouseResiNumber param customerId & icResiUserId:{}", JSON.toJSONString(obj));
} catch (Exception e) { } catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); log.error("updateIcHouseResiNumber exception", e);
return false;
} finally {
if (distributedLock != null){
distributedLock.unLock(lock);
} }
} }
return true;
} }
/** /**
* desc:根据居民Id更新 核酸检测的是否居民状态(只有新增和删除时调用) * desc:根据居民Id更新 核酸检测的是否居民状态(只有新增和删除时调用)
* @param tags * @param tags
* @param obj * @param obj
* @return
*/ */
private void updateIcNatResiFlag(String tags, IcResiUserAddMQMsg obj) { private boolean updateIcNatResiFlag(String tags, IcResiUserAddMQMsg obj) {
DistributedLock distributedLock = null;
RLock lock = null;
try { try {
if (StringUtils.isBlank(obj.getCustomerId())||StringUtils.isBlank(obj.getIcResiUser())){ if (StringUtils.isBlank(obj.getCustomerId()) || StringUtils.isBlank(obj.getIcResiUser())) {
log.warn("updateIcNatResiFlag param is error"); log.warn("updateIcNatResiFlag param is error");
return; return true;
} }
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:ic_nat_resi_flag:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
if (SystemMessageType.IC_RESI_USER_ADD.equals(tags) || SystemMessageType.IC_RESI_USER_DEL.equals(tags)) { if (SystemMessageType.IC_RESI_USER_ADD.equals(tags) || SystemMessageType.IC_RESI_USER_DEL.equals(tags)) {
Integer effectRow = SpringContextUtils.getBean(IcNatService.class).updateIsResiFlag(obj.getCustomerId(), obj.getIcResiUser()); Integer effectRow = SpringContextUtils.getBean(IcNatService.class).updateIsResiFlag(obj.getCustomerId(), obj.getIcResiUser());
log.debug("updateIcNatResiFlag effectRow:{},param customerId & icResiUserId:{}", effectRow, JSON.toJSONString(obj)); log.debug("【居民信息变动】updateIcNatResiFlag effectRow:{},param customerId & icResiUserId:{}", effectRow, JSON.toJSONString(obj));
} }
} catch (Exception e) { } catch (Exception e) {
log.error("updateIcNatResiFlag exception", e); log.error("updateIcNatResiFlag exception", e);
return false;
} finally {
if (distributedLock != null){
distributedLock.unLock(lock);
} }
} }
return true;
}
/** /**
* @description
*
* @param pendingMsgLabel * @param pendingMsgLabel
* @return * @return
* @description
* @author wxz * @author wxz
* @date 2021.10.14 16:32:32 * @date 2021.10.14 16:32:32
*/ */

9
epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java

@ -335,7 +335,7 @@ public interface IcResiUserService extends BaseService<IcResiUserEntity> {
SyncResiResDTO checkUser(String customerId, String idCard, String agencyId); SyncResiResDTO checkUser(String customerId, String idCard, String agencyId);
/** /**
* desc:条件获取居民基础信息 * desc:条件获取居民基础信息 按照房屋分组
* *
* @param formDTO * @param formDTO
* @return * @return
@ -445,4 +445,11 @@ public interface IcResiUserService extends BaseService<IcResiUserEntity> {
* @return * @return
*/ */
List<ResiVolunteerSelectDTO> queryVolunteerList(String customerId, String userId); List<ResiVolunteerSelectDTO> queryVolunteerList(String customerId, String userId);
/**
* desc:根据客户Id 更新房屋的 居民人数
* @param customerId
* @return
*/
void updateIcHouseResiNumber(String customerId);
} }

48
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java

@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.epmet.bean.ResiExportBaseInfoData; import com.epmet.bean.ResiExportBaseInfoData;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.constant.*; import com.epmet.commons.tools.constant.*;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.dto.form.DictListFormDTO; import com.epmet.commons.tools.dto.form.DictListFormDTO;
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult; import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult;
import com.epmet.commons.tools.dto.result.DictListResultDTO; import com.epmet.commons.tools.dto.result.DictListResultDTO;
@ -56,12 +57,12 @@ import com.epmet.dto.form.*;
import com.epmet.dto.form.demand.UserDemandNameQueryFormDTO; import com.epmet.dto.form.demand.UserDemandNameQueryFormDTO;
import com.epmet.dto.result.*; import com.epmet.dto.result.*;
import com.epmet.dto.result.demand.IcResiDemandDictDTO; import com.epmet.dto.result.demand.IcResiDemandDictDTO;
import com.epmet.dto.result.demand.OptionDTO;
import com.epmet.entity.*; import com.epmet.entity.*;
import com.epmet.excel.support.ExportResiUserItemDTO; import com.epmet.excel.support.ExportResiUserItemDTO;
import com.epmet.feign.*; import com.epmet.feign.*;
import com.epmet.resi.partymember.feign.ResiPartyMemberOpenFeignClient; import com.epmet.resi.partymember.feign.ResiPartyMemberOpenFeignClient;
import com.epmet.service.*; import com.epmet.service.*;
import com.epmet.dto.result.demand.OptionDTO;
import com.github.pagehelper.Page; import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
@ -85,6 +86,8 @@ import java.sql.Timestamp;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
@ -129,10 +132,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
private IcUserTransferRecordService icUserTransferRecordService; private IcUserTransferRecordService icUserTransferRecordService;
@Autowired @Autowired
private RedisTemplate redisTemplate; private RedisTemplate redisTemplate;
//@Resource @Autowired
//private IcNatService icNatService; private ExecutorService executorService;
//@Resource
//private IcVaccineService icVaccineService;
@Resource @Resource
private IcTripReportRecordService icTripReportRecordService; private IcTripReportRecordService icTripReportRecordService;
@Resource @Resource
@ -145,6 +146,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
private EpmetHeartOpenFeignClient epmetHeartOpenFeignClient; private EpmetHeartOpenFeignClient epmetHeartOpenFeignClient;
@Resource @Resource
private UserBaseInfoDao userBaseInfoDao; private UserBaseInfoDao userBaseInfoDao;
@Autowired
private DistributedLock distributedLock;
private QueryWrapper<IcResiUserEntity> getWrapper(Map<String, Object> params) { private QueryWrapper<IcResiUserEntity> getWrapper(Map<String, Object> params) {
String id = (String) params.get(FieldConstant.ID_HUMP); String id = (String) params.get(FieldConstant.ID_HUMP);
@ -2576,6 +2579,41 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
@Override
public void updateIcHouseResiNumber(String customerId) {
String threadKey = RedisKeys.updateIcHouseResiNumberThread(customerId);
/*RLock lock = distributedLock.getLock(threadKey);
if (lock!= null && lock.isLocked()){
Future<Integer> o = (Future<Integer>) redisUtils.get(threadKey);
if (o != null){
//如果是自己的 则取消就行了
//o.cancel(true);
log.info("updateIcHouseResiNumber 任务取消");
}
}*/
Future submit = executorService.submit(() -> {
int pageNo = 1;
int pageSize = NumConstant.TEN_THOUSAND;
Page<HouseIcResiUserCountResultDTO> houseReisCountListPage = null;
String key = RedisKeys.getUpdateHouseResiNumer(customerId);
redisUtils.delete(key);
do {
houseReisCountListPage = PageHelper.startPage(pageNo++, pageSize, false)
.doSelectPage(() -> baseDao.getResiUserCountGroupHomeId(customerId));
System.out.println(houseReisCountListPage.getResult().size());
List<HouseIcResiUserCountResultDTO> result = houseReisCountListPage.getResult();
for (HouseIcResiUserCountResultDTO o: result){
redisUtils.leftPush(key,o);
}
} while (houseReisCountListPage.getResult().size() == pageSize);
log.info("updateIcHouseResiNumber 执行完毕");
//distributedLock.unLock(lock);
});
redisUtils.set(threadKey,submit);
}
/** /**
* 根据身份证获取居民角色目前只有是否是志愿者 * 根据身份证获取居民角色目前只有是否是志愿者
* *

3
epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml

@ -1132,4 +1132,7 @@
AND customer_id = #{customerId} AND customer_id = #{customerId}
AND DEL_FLAG = '0' AND DEL_FLAG = '0'
</select> </select>
<select id="getResiUserCountGroupHomeId" resultType="com.epmet.dto.result.HouseIcResiUserCountResultDTO">
select HOME_ID,count(1) resiNumber from ic_resi_user WHERE CUSTOMER_ID = #{customerId} and DEL_FLAG = '0' AND `status` = '0' group by HOME_ID
</select>
</mapper> </mapper>

Loading…
Cancel
Save