Browse Source

Merge remote-tracking branch 'origin/dev_5big_coverage' into dev_5big_coverage

dev
wangxianzhang 3 years ago
parent
commit
78f2fd1471
  1. 24
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 46
      epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java
  3. 9
      epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java
  4. 95
      epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java
  5. 9
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java
  6. 48
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java
  7. 3
      epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml

24
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -777,4 +777,28 @@ public class RedisKeys {
}
return rootPrefix.concat("lock:").concat(methodName);
}
/**
* desc:获取更新 房屋内有居民数量的key
* @param customerId
* @return
*/
public static String getUpdateHouseResiNumer(String customerId) {
if (StringUtils.isBlank(customerId)){
throw new EpmetException(EpmetErrorCode.EPMET_COMMON_OPERATION_FAIL.getCode(),"参数错误");
}
return rootPrefix.concat("updateHouseResiNumber:").concat(customerId);
}
/**
* desc:获取更新 房屋内有居民数量 线程的key
* @param customerId
* @return
*/
public static String updateIcHouseResiNumberThread(String customerId) {
if (StringUtils.isBlank(customerId)){
throw new EpmetException(EpmetErrorCode.EPMET_COMMON_OPERATION_FAIL.getCode(),"参数错误");
}
return rootPrefix.concat("updateHouseResiNumber:thread:").concat(customerId);
}
}

46
epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/HouseIcResiUserCountResultDTO.java

@ -0,0 +1,46 @@
/**
* Copyright 2018 人人开源 https://www.renren.io
* <p>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* <p>
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* <p>
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.epmet.dto.result;
import lombok.Data;
import java.io.Serializable;
/**
* 房屋内居民总数结果
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2021-10-26
*/
@Data
public class HouseIcResiUserCountResultDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 所属家庭Id
*/
private String homeId;
/**
* 房间内总人数
*/
private Integer resiNumber;
}

9
epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcResiUserDao.java

@ -363,11 +363,18 @@ public interface IcResiUserDao extends BaseDao<IcResiUserEntity> {
@Param("customerId") String customerId);
/**
* @return com.epmet.entity.IcResiUserEntity
* @describe: 通过身份证号查询实体类
* @author wangtong
* @date 2022/6/13 17:32
* @params [idCard, customerId]
* @return com.epmet.entity.IcResiUserEntity
*/
IcResiUserEntity selectResiUserEntityByIdCard(@Param("idCard") String idCard, @Param("customerId") String customerId);
/**
* desc:获取客户内 每个房屋内的居民数
* @param customerId
* @return
*/
List<HouseIcResiUserCountResultDTO> getResiUserCountGroupHomeId(@Param("customerId") String customerId);
}

95
epmet-user/epmet-user-server/src/main/java/com/epmet/mq/listener/ICWarnStatsEventListener.java

@ -11,6 +11,7 @@ import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.SystemMessageType;
import com.epmet.service.IcNatService;
import com.epmet.service.IcResiUserService;
import com.epmet.service.StatsResiWarnService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
@ -26,8 +27,8 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @Description 负能平台-客户居民信息变动监听器
* @author wxz
* @Description 负能平台-居民信息变动监听器
* @date 2021.10.13 15:21:48
*/
@Slf4j
@ -37,6 +38,7 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently {
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -65,62 +67,119 @@ public class ICWarnStatsEventListener implements MessageListenerConcurrently {
logger.info("【开放数据事件监听器】-居民信息变动-收到消息内容:{},操作:{}", msg, tags);
IcResiUserAddMQMsg obj = JSON.parseObject(msg, IcResiUserAddMQMsg.class);
log.info("obj is {}", JSON.toJSONString(obj));
try {
if (!resiWarnStats(obj)||!updateIcNatResiFlag(tags, obj)||!updateIcHouseResiNumber(obj)){
}
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
private boolean resiWarnStats(IcResiUserAddMQMsg obj) {
DistributedLock distributedLock = null;
RLock lock = null;
try {
if (StringUtils.isBlank(obj.getCustomerId())) {
log.warn("resiWarnStats param is error");
return true;
}
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
//待执行方法
SpringContextUtils.getBean(StatsResiWarnService.class).resiWarn(obj.getCustomerId());
//根据居民Id更新 核酸检测的是否居民状态
updateIcNatResiFlag(tags, obj);
log.debug("【居民信息变动】resiWarnStats param customerId:{}", JSON.toJSONString(obj));
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-客户居民信息变动MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
log.error("resiWarnStats exception", e);
return false;
} finally {
if (distributedLock != null){
distributedLock.unLock(lock);
}
}
return true;
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
/**
* desc:根据客户id更新 每个房屋内居民的总数
* @param obj
* @return
* @remark 因为使用了线程 所以结果不用关心
*/
private boolean updateIcHouseResiNumber(IcResiUserAddMQMsg obj) {
DistributedLock distributedLock = null;
RLock lock = null;
try {
removePendingMqMsgCache(pendingMsgLabel);
if (StringUtils.isBlank(obj.getCustomerId())) {
log.warn("updateIcHouseResiNumber param is error");
return true;
}
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:update_house_resi_number:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
SpringContextUtils.getBean(IcResiUserService.class).updateIcHouseResiNumber(obj.getCustomerId());
log.debug("【居民信息变动】updateIcHouseResiNumber param customerId & icResiUserId:{}", JSON.toJSONString(obj));
} catch (Exception e) {
logger.error("【开放数据事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
log.error("updateIcHouseResiNumber exception", e);
return false;
} finally {
if (distributedLock != null){
distributedLock.unLock(lock);
}
}
return true;
}
/**
* desc:根据居民Id更新 核酸检测的是否居民状态(只有新增和删除时调用)
* @param tags
* @param obj
* @return
*/
private void updateIcNatResiFlag(String tags, IcResiUserAddMQMsg obj) {
private boolean updateIcNatResiFlag(String tags, IcResiUserAddMQMsg obj) {
DistributedLock distributedLock = null;
RLock lock = null;
try {
if (StringUtils.isBlank(obj.getCustomerId()) || StringUtils.isBlank(obj.getIcResiUser())) {
log.warn("updateIcNatResiFlag param is error");
return;
return true;
}
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:ic_nat_resi_flag:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
if (SystemMessageType.IC_RESI_USER_ADD.equals(tags) || SystemMessageType.IC_RESI_USER_DEL.equals(tags)) {
Integer effectRow = SpringContextUtils.getBean(IcNatService.class).updateIsResiFlag(obj.getCustomerId(), obj.getIcResiUser());
log.debug("updateIcNatResiFlag effectRow:{},param customerId & icResiUserId:{}", effectRow, JSON.toJSONString(obj));
log.debug("【居民信息变动】updateIcNatResiFlag effectRow:{},param customerId & icResiUserId:{}", effectRow, JSON.toJSONString(obj));
}
} catch (Exception e) {
log.error("updateIcNatResiFlag exception", e);
return false;
} finally {
if (distributedLock != null){
distributedLock.unLock(lock);
}
}
return true;
}
/**
* @description
*
* @param pendingMsgLabel
* @return
* @description
* @author wxz
* @date 2021.10.14 16:32:32
*/

9
epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcResiUserService.java

@ -335,7 +335,7 @@ public interface IcResiUserService extends BaseService<IcResiUserEntity> {
SyncResiResDTO checkUser(String customerId, String idCard, String agencyId);
/**
* desc:条件获取居民基础信息
* desc:条件获取居民基础信息 按照房屋分组
*
* @param formDTO
* @return
@ -445,4 +445,11 @@ public interface IcResiUserService extends BaseService<IcResiUserEntity> {
* @return
*/
List<ResiVolunteerSelectDTO> queryVolunteerList(String customerId, String userId);
/**
* desc:根据客户Id 更新房屋的 居民人数
* @param customerId
* @return
*/
void updateIcHouseResiNumber(String customerId);
}

48
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcResiUserServiceImpl.java

@ -24,6 +24,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.epmet.bean.ResiExportBaseInfoData;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.constant.*;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.dto.form.DictListFormDTO;
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult;
import com.epmet.commons.tools.dto.result.DictListResultDTO;
@ -56,12 +57,12 @@ import com.epmet.dto.form.*;
import com.epmet.dto.form.demand.UserDemandNameQueryFormDTO;
import com.epmet.dto.result.*;
import com.epmet.dto.result.demand.IcResiDemandDictDTO;
import com.epmet.dto.result.demand.OptionDTO;
import com.epmet.entity.*;
import com.epmet.excel.support.ExportResiUserItemDTO;
import com.epmet.feign.*;
import com.epmet.resi.partymember.feign.ResiPartyMemberOpenFeignClient;
import com.epmet.service.*;
import com.epmet.dto.result.demand.OptionDTO;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
@ -85,6 +86,8 @@ import java.sql.Timestamp;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@ -129,10 +132,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
private IcUserTransferRecordService icUserTransferRecordService;
@Autowired
private RedisTemplate redisTemplate;
//@Resource
//private IcNatService icNatService;
//@Resource
//private IcVaccineService icVaccineService;
@Autowired
private ExecutorService executorService;
@Resource
private IcTripReportRecordService icTripReportRecordService;
@Resource
@ -145,6 +146,8 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
private EpmetHeartOpenFeignClient epmetHeartOpenFeignClient;
@Resource
private UserBaseInfoDao userBaseInfoDao;
@Autowired
private DistributedLock distributedLock;
private QueryWrapper<IcResiUserEntity> getWrapper(Map<String, Object> params) {
String id = (String) params.get(FieldConstant.ID_HUMP);
@ -2576,6 +2579,41 @@ public class IcResiUserServiceImpl extends BaseServiceImpl<IcResiUserDao, IcResi
}).collect(Collectors.toList());
}
@Override
public void updateIcHouseResiNumber(String customerId) {
String threadKey = RedisKeys.updateIcHouseResiNumberThread(customerId);
/*RLock lock = distributedLock.getLock(threadKey);
if (lock!= null && lock.isLocked()){
Future<Integer> o = (Future<Integer>) redisUtils.get(threadKey);
if (o != null){
//如果是自己的 则取消就行了
//o.cancel(true);
log.info("updateIcHouseResiNumber 任务取消");
}
}*/
Future submit = executorService.submit(() -> {
int pageNo = 1;
int pageSize = NumConstant.TEN_THOUSAND;
Page<HouseIcResiUserCountResultDTO> houseReisCountListPage = null;
String key = RedisKeys.getUpdateHouseResiNumer(customerId);
redisUtils.delete(key);
do {
houseReisCountListPage = PageHelper.startPage(pageNo++, pageSize, false)
.doSelectPage(() -> baseDao.getResiUserCountGroupHomeId(customerId));
System.out.println(houseReisCountListPage.getResult().size());
List<HouseIcResiUserCountResultDTO> result = houseReisCountListPage.getResult();
for (HouseIcResiUserCountResultDTO o: result){
redisUtils.leftPush(key,o);
}
} while (houseReisCountListPage.getResult().size() == pageSize);
log.info("updateIcHouseResiNumber 执行完毕");
//distributedLock.unLock(lock);
});
redisUtils.set(threadKey,submit);
}
/**
* 根据身份证获取居民角色目前只有是否是志愿者
*

3
epmet-user/epmet-user-server/src/main/resources/mapper/IcResiUserDao.xml

@ -1132,4 +1132,7 @@
AND customer_id = #{customerId}
AND DEL_FLAG = '0'
</select>
<select id="getResiUserCountGroupHomeId" resultType="com.epmet.dto.result.HouseIcResiUserCountResultDTO">
select HOME_ID,count(1) resiNumber from ic_resi_user WHERE CUSTOMER_ID = #{customerId} and DEL_FLAG = '0' AND `status` = '0' group by HOME_ID
</select>
</mapper>

Loading…
Cancel
Save