@ -1,28 +1,37 @@
package com.epmet.service.impl ;
import java.util.Date ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONObject ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper ;
import com.epmet.commons.mybatis.entity.BaseEpmetEntity ;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl ;
import com.epmet.commons.tools.constant.NumConstant ;
import com.epmet.commons.tools.constant.StrConstant ;
import com.epmet.commons.tools.distributedlock.DistributedLock ;
import com.epmet.commons.tools.dto.form.PageFormDTO ;
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult ;
import com.epmet.commons.tools.dto.result.YtDataSyncResDTO ;
import com.epmet.commons.tools.dto.result.YtHscyResDTO ;
import com.epmet.commons.tools.dto.result.YtHsjcResDTO ;
import com.epmet.commons.tools.enums.GenderEnum ;
import com.epmet.commons.tools.exception.EpmetErrorCode ;
import com.epmet.commons.tools.exception.EpmetException ;
import com.epmet.commons.tools.exception.ExceptionUtils ;
import com.epmet.commons.tools.page.PageData ;
import com.epmet.commons.tools.redis.common.CustomerOrgRedis ;
import com.epmet.commons.tools.redis.common.CustomerStaffRedis ;
import com.epmet.commons.tools.redis.common.bean.AgencyInfoCache ;
import com.epmet.commons.tools.redis.common.bean.GridInfoCache ;
import com.epmet.commons.tools.security.dto.TokenDto ;
import com.epmet.commons.tools.utils.ConvertUtils ;
import com.epmet.commons.tools.utils.DateUtils ;
import com.epmet.commons.tools.utils.YtHsResUtils ;
import com.epmet.constant.EpidemicConstant ;
import com.epmet.dao.DataSyncConfigDao ;
import com.epmet.dao.IcNatDao ;
import com.epmet.dao.IcSyncJobDao ;
import com.epmet.dto.DataSyncConfigDTO ;
import com.epmet.dto.DataSyncRecordDeathDTO ;
import com.epmet.dto.DataSyncRecordDisabilityDTO ;
@ -39,8 +48,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils ;
import org.apache.commons.collections4.ListUtils ;
import org.apache.commons.lang3.StringUtils ;
import org.redisson.api.RLock ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.jdbc.core.JdbcTemplate ;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate ;
import org.springframework.stereotype.Service ;
import org.springframework.transaction.annotation.Transactional ;
@ -48,9 +59,12 @@ import javax.annotation.Resource;
import java.util.* ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import static com.epmet.constant.EpidemicConstant.* ;
/ * *
* 数据更新配置表
*
@ -75,6 +89,15 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
private DataSyncRecordDisabilityService dataSyncRecordDisabilityService ;
@Autowired
private ExecutorService executorService ;
@Autowired
private IcSyncJobService icSyncJobService ;
@Autowired
private IcSyncJobDao icSyncJobDao ;
@Autowired
private DistributedLock distributedLock ;
@Resource ( name = "yantaiNamedParamLantuJdbcTemplate" )
private NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate ;
@Resource ( name = "yantaiLantuJdbcTemplate" )
private JdbcTemplate yantaiJdbcTemplate ;
@ -130,7 +153,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
public PageData list ( TokenDto tokenDto , PageFormDTO formDTO ) {
PageData < DataSyncConfigDTO > result = new PageData < > ( new ArrayList < > ( ) , NumConstant . ZERO_L ) ;
PageInfo < DataSyncConfigDTO > pageInfo = PageHelper . startPage ( formDTO . getPageNo ( ) , formDTO . getPageSize ( ) )
. doSelectPageInfo ( ( ) - > baseDao . list ( tokenDto . getCustomerId ( ) , null ) ) ;
. doSelectPageInfo ( ( ) - > baseDao . list ( tokenDto . getCustomerId ( ) , null , null ) ) ;
if ( CollectionUtils . isNotEmpty ( pageInfo . getList ( ) ) ) {
result . setList ( pageInfo . getList ( ) ) ;
result . setTotal ( Integer . parseInt ( String . valueOf ( pageInfo . getTotal ( ) ) ) ) ;
@ -185,7 +208,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* /
@Override
public void dataSyncForYanTaiTask ( DataSyncTaskParam formDTO ) {
List < DataSyncConfigDTO > allConfigList = baseDao . list ( formDTO . getCustomerId ( ) , "open" ) ;
List < DataSyncConfigDTO > allConfigList = baseDao . list ( formDTO . getCustomerId ( ) , "open" , null ) ;
if ( CollectionUtils . isEmpty ( allConfigList ) ) {
return ;
}
@ -221,6 +244,259 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
}
}
/ * *
* @Description 死亡信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022 / 11 / 8 09 : 01
* /
@Override
public void deathInfoScanTask ( DataSyncTaskParam formDTO ) {
List < DataSyncConfigDTO > configData = getConfigData ( null , EpidemicConstant . DATA_CODE_DEATH ) ;
if ( CollectionUtils . isEmpty ( configData ) ) {
log . warn ( "deathInfoScanTask not exists config data,customer is " + formDTO . getCustomerId ( ) ) ;
return ;
}
long count = configData . stream ( ) . filter ( o - > CollectionUtils . isNotEmpty ( o . getScopeList ( ) ) ) . count ( ) ;
if ( count < 1 ) {
log . warn ( "deathInfoScanTask scopeList is null" ) ;
return ;
}
int pageNo = NumConstant . ONE ;
int pageSize = NumConstant . ONE_THOUSAND ;
List < NatUserInfoResultDTO > dbResiList = null ;
do {
for ( DataSyncConfigDTO config : configData ) {
// 设置查询数据范围
formDTO . setOrgList ( config . getScopeList ( ) ) ;
DataSyncEnum anEnum = DataSyncEnum . getEnum ( config . getDataCode ( ) ) ;
dbResiList = getNatUserInfoFromDb ( formDTO , pageNo , pageSize ) ;
if ( CollectionUtils . isEmpty ( dbResiList ) ) {
continue ;
}
switch ( anEnum ) {
case SI_WANG :
try {
//查询正常状态的居民
siWang ( dbResiList ) ;
log . info ( "======siWang信息拉取结束======" ) ;
} catch ( Exception e ) {
log . error ( "death thread execute exception" , e ) ;
}
default :
log . warn ( "没有要处理的数据" ) ;
}
}
pageNo + + ;
} while ( dbResiList ! = null & & dbResiList . size ( ) = = pageSize ) ;
}
/ * *
* @Description 残疾信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022 / 11 / 8 09 : 01
* /
@Override
public void disabilityInfoScanTask ( DataSyncTaskParam formDTO ) {
List < DataSyncConfigDTO > configData = getConfigData ( null , EpidemicConstant . DATA_CODE_DISABILITY ) ;
if ( CollectionUtils . isEmpty ( configData ) ) {
log . warn ( "disabilityInfoScanTask not exists config data,customer is " + formDTO . getCustomerId ( ) ) ;
return ;
}
long count = configData . stream ( ) . filter ( o - > CollectionUtils . isNotEmpty ( o . getScopeList ( ) ) ) . count ( ) ;
if ( count < 1 ) {
log . warn ( "disabilityInfoScanTask scopeList is null" ) ;
return ;
}
int pageNo = NumConstant . ONE ;
int pageSize = NumConstant . ONE_THOUSAND ;
List < NatUserInfoResultDTO > dbResiList = null ;
do {
for ( DataSyncConfigDTO config : configData ) {
// 设置查询数据范围
formDTO . setOrgList ( config . getScopeList ( ) ) ;
formDTO . setCategoryColumn ( "IS_CJ" ) ;
DataSyncEnum anEnum = DataSyncEnum . getEnum ( config . getDataCode ( ) ) ;
dbResiList = getNatUserInfoFromDb ( formDTO , pageNo , pageSize ) ;
if ( CollectionUtils . isEmpty ( dbResiList ) ) {
continue ;
}
switch ( anEnum ) {
case CAN_JI :
try {
//查询正常状态的居民
canJi ( dbResiList ) ;
log . info ( "======canJi信息拉取结束======" ) ;
} catch ( Exception e ) {
log . error ( "disability thread execute exception" , e ) ;
}
break ;
default :
log . warn ( "没有要处理的数据" ) ;
}
}
pageNo + + ;
} while ( dbResiList ! = null & & dbResiList . size ( ) = = pageSize ) ;
}
/ * *
* @Description 核酸检测信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022 / 11 / 8 10 : 37
* /
@Override
public void natInfoScanTask ( DataSyncTaskParam formDTO ) {
List < DataSyncConfigDTO > configData = new ArrayList < > ( ) ;
if ( StringUtils . isBlank ( formDTO . getAgencyId ( ) ) ) {
configData = getConfigData ( null , EpidemicConstant . DATA_CODE_NAT ) ;
if ( CollectionUtils . isEmpty ( configData ) ) {
log . warn ( "natInfoScanTask not exists config data,customer is " + formDTO . getCustomerId ( ) ) ;
return ;
}
long count = configData . stream ( ) . filter ( o - > CollectionUtils . isNotEmpty ( o . getScopeList ( ) ) ) . count ( ) ;
if ( count < 1 ) {
log . warn ( "natInfoScanTask scopeList is null" ) ;
return ;
}
}
int pageNo = NumConstant . ONE ;
int pageSize = NumConstant . ONE_THOUSAND ;
List < NatUserInfoResultDTO > dbResiList = null ;
do {
if ( StringUtils . isBlank ( formDTO . getAgencyId ( ) ) ) {
for ( DataSyncConfigDTO config : configData ) {
// 设置查询数据范围
formDTO . setOrgList ( config . getScopeList ( ) ) ;
DataSyncEnum anEnum = DataSyncEnum . getEnum ( config . getDataCode ( ) ) ;
dbResiList = getNatUserInfoFromDb ( formDTO , pageNo , pageSize ) ;
if ( CollectionUtils . isEmpty ( dbResiList ) ) {
continue ;
}
switch ( anEnum ) {
case HE_SUAN :
try {
//查询正常状态的居民
yantaiHsjcByDbView ( dbResiList , config . getCustomerId ( ) , formDTO . getIsSync ( ) ) ;
log . info ( "======核酸检测信息拉取结束======" ) ;
} catch ( Exception e ) {
log . error ( "nat thread execute exception" , e ) ;
}
break ;
default :
log . warn ( "没有要处理的数据" ) ;
}
}
} else {
dbResiList = getNatUserInfoFromDb ( formDTO , pageNo , pageSize ) ;
if ( CollectionUtils . isEmpty ( dbResiList ) ) {
return ;
}
yantaiHsjcByDbView ( dbResiList , dbResiList . get ( NumConstant . ZERO ) . getCustomerId ( ) , formDTO . getIsSync ( ) ) ;
}
pageNo + + ;
} while ( dbResiList ! = null & & dbResiList . size ( ) = = pageSize ) ;
}
/ * *
* 提交同步任务 , 放到任务池
* @author wxz
* @date 2022 / 11 / 8 下午5 : 32
* @param formDTO
* /
@Override
public void natInfoSyncButton ( DataSyncTaskParam formDTO ) {
CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis . getStaffInfo ( formDTO . getCustomerId ( ) , formDTO . getStaffId ( ) ) ;
if ( null = = staffInfo ) {
throw new EpmetException ( "未查询到工作人员信息:" + formDTO . getStaffId ( ) ) ;
}
AgencyInfoCache agencyInfo = CustomerOrgRedis . getAgencyInfo ( formDTO . getAgencyId ( ) ) ;
if ( null = = agencyInfo ) {
throw new EpmetException ( "未查询到组织信息:" + formDTO . getAgencyId ( ) ) ;
}
// 查询该组织是否存在等待中或者进行中的任务
LambdaQueryWrapper < IcSyncJobEntity > qw = new LambdaQueryWrapper < > ( ) ;
qw . eq ( IcSyncJobEntity : : getOrgId , formDTO . getAgencyId ( ) )
. in ( IcSyncJobEntity : : getOperationStatus , OPERATION_STATUS_WAITING , OPERATION_STATUS_PROCESSING ) ;
List < IcSyncJobEntity > icSyncJobEntities = icSyncJobDao . selectList ( qw ) ;
// 当前组织下存在同步任务
if ( CollectionUtils . isNotEmpty ( icSyncJobEntities ) ) {
throw new EpmetException ( EpmetErrorCode . EXIST_SYNC_JOB_ERROR . getCode ( ) ) ;
}
// 不存在新增一条记录
IcSyncJobEntity e = new IcSyncJobEntity ( ) ;
e . setCustomerId ( formDTO . getCustomerId ( ) ) ;
e . setOrgId ( formDTO . getAgencyId ( ) ) ;
e . setPid ( agencyInfo . getPid ( ) ) ;
e . setOrgIdPath ( StringUtils . isBlank ( agencyInfo . getPids ( ) ) ? agencyInfo . getId ( ) : agencyInfo . getPids ( ) + ":" + agencyInfo . getId ( ) ) ;
e . setJobType ( JOB_TYPE_NAT ) ;
e . setOperatorId ( formDTO . getStaffId ( ) ) ;
e . setOperationStatus ( OPERATION_STATUS_WAITING ) ;
insertSync ( e ) ;
//List<IcSyncJobEntity> waitList;
//do {
// LambdaQueryWrapper<IcSyncJobEntity> qw3 = new LambdaQueryWrapper<>();
// qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId())
// .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING)
// .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT)
// .orderByAsc(BaseEpmetEntity::getCreatedTime);
// waitList = icSyncJobDao.selectList(qw3);
// if (CollectionUtils.isNotEmpty(waitList)){
// for (IcSyncJobEntity entity : waitList) {
// RLock lock = null;
// try {
// lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
// updateSync(entity.getId(),OPERATION_STATUS_PROCESSING);
// }catch (Exception ex){
// log.error(ex.getMessage());
// throw new EpmetException(ex.getMessage());
// }finally {
// lock.unlock();
// }
// formDTO.setAgencyId(entity.getOrgId());
// try {
// natInfoScanTask(formDTO);
// }catch (Exception ee){
// log.error(ee.getMessage());
// throw new EpmetException(ee.getMessage());
// }finally {
// updateSync(entity.getId(),OPERATION_STATUS_FINISH);
// }
// }
// }
//}while (CollectionUtils.isNotEmpty(waitList));
}
@Transactional ( rollbackFor = Exception . class )
public void updateSync ( String id , String status ) {
LambdaUpdateWrapper < IcSyncJobEntity > qwUpdate = new LambdaUpdateWrapper < > ( ) ;
qwUpdate . eq ( BaseEpmetEntity : : getId , id )
. set ( IcSyncJobEntity : : getOperationStatus , status )
. set ( BaseEpmetEntity : : getUpdatedTime , new Date ( ) ) ;
icSyncJobDao . update ( null , qwUpdate ) ;
}
@Transactional ( rollbackFor = Exception . class )
public void insertSync ( IcSyncJobEntity e ) {
icSyncJobService . insert ( e ) ;
}
/ * *
* @Description 配置信息查询
* @param customerId
* @param dataCode
* @Author zxc
* @Date 2022 / 11 / 8 10 : 41
* /
public List < DataSyncConfigDTO > getConfigData ( String customerId , String dataCode ) {
List < DataSyncConfigDTO > allConfigList = baseDao . list ( customerId , "open" , dataCode ) ;
return CollectionUtils . isEmpty ( allConfigList ) ? new ArrayList < > ( ) : allConfigList ;
}
private void dataSyncYanTaiParallel ( List < DataSyncConfigDTO > configList , DataSyncTaskParam formDTO ) {
if ( CollectionUtils . isEmpty ( configList ) ) {
log . warn ( "dataSyncYanTaiParallel configList is null" ) ;
@ -255,7 +531,8 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
executorService . submit ( ( ) - > {
try {
//查询正常状态的居民
hsjc ( finalDbResiList , config . getCustomerId ( ) , formDTO . getIsSync ( ) ) ;
//hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
yantaiHsjcByDbView ( finalDbResiList , config . getCustomerId ( ) , formDTO . getIsSync ( ) ) ;
log . info ( "======核酸检测信息拉取结束======" ) ;
} catch ( Exception e ) {
log . error ( "hsjc thread execute exception" , e ) ;
@ -421,7 +698,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* @param pageSize
* @return
* /
private List < NatUserInfoResultDTO > getNatUserInfoFromDb ( DataSyncTaskParam formDTO , int pageNo , int pageSize ) {
public List < NatUserInfoResultDTO > getNatUserInfoFromDb ( DataSyncTaskParam formDTO , int pageNo , int pageSize ) {
//根据 组织 分页获取 居民数据
PageInfo < NatUserInfoResultDTO > pageInfo = PageHelper . startPage ( pageNo , pageSize , false )
. doSelectPageInfo ( ( ) - > baseDao . getIdCardsByScope ( formDTO ) ) ;
@ -629,10 +906,13 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* 数据库采样时间 + idCard + userId 存在的 做更新
* 数据库采样时间 + idCard + userId 不存在的 新增
* /
entities . forEach ( e - > existInfo . stream ( ) . filter ( i - > i . getUserId ( ) . equals ( e . getUserId ( ) ) & & i . getIdCard ( ) . equals ( e . getIdCard ( ) ) ) . forEach ( i - > {
e . setExistStatus ( true ) ;
e . setId ( i . getId ( ) ) ;
} ) ) ;
entities . forEach ( e - > existInfo . stream ( )
. filter ( i - > i . getUserId ( ) . equals ( e . getUserId ( ) ) & & i . getIdCard ( ) . equals ( e . getIdCard ( ) ) & & i . getSampleTime ( ) . equals ( e . getSampleTime ( ) ) )
. forEach ( i - > {
e . setExistStatus ( true ) ;
e . setId ( i . getId ( ) ) ;
} ) ) ;
Map < Boolean , List < IcNatEntity > > groupByStatus = entities . stream ( ) . collect ( Collectors . groupingBy ( IcNatEntity : : getExistStatus ) ) ;
if ( CollectionUtils . isNotEmpty ( groupByStatus . get ( false ) ) ) {
for ( List < IcNatEntity > icNatEntities : ListUtils . partition ( groupByStatus . get ( false ) , NumConstant . FIVE_HUNDRED ) ) {
@ -786,35 +1066,43 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
public void yantaiHsjcByDbView ( List < NatUserInfoResultDTO > resiInfos , String customerId , String isSync ) {
List < List < NatUserInfoResultDTO > > resiInfobatchs = ListUtils . partition ( resiInfos , 50 ) ;
for ( List < NatUserInfoResultDTO > resibatch : resiInfobatchs ) {
// 50个一批,来处理他们的核酸信息,太多怕给数据库查崩了。
yantaiHsjcByDbViewBatch ( resibatch , customerId , isSync ) ;
// n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。
try {
yantaiHsjcByDbViewPartition ( resibatch , customerId , isSync ) ;
} catch ( Exception e ) {
String errorMsg = ExceptionUtils . getErrorStackTrace ( e ) ;
log . error ( "【更新核酸检测信息(from 兰图)】失败,信息:{}" , errorMsg ) ;
}
}
}
/ * *
* 50个一批 , 来处理他们的核酸信息 , 太多怕给数据库查崩了 。
* n 个一批, 来处理他们的核酸信息 , 太多怕给数据库查崩了 。
* @param resiInfos
* @param customerId
* @param isSync
* /
public void yantaiHsjcByDbViewBatch ( List < NatUserInfoResultDTO > resiInfos , String customerId , String isSync ) {
public void yantaiHsjcByDbViewPartition ( List < NatUserInfoResultDTO > resiInfos , String customerId , String isSync ) {
// 将居民信息转化为<idCard,resiInfo>的map
Map < String , NatUserInfoResultDTO > idCardAndResiInfo = resiInfos . stream ( ) . collect ( Collectors . toMap ( resi - > resi . getIdCard ( ) , Function . identity ( ) ) ) ;
Set < String > idCards = idCardAndResiInfo . keySet ( ) ; //resiInfos.stream().map(resi -> resi.getIdCard()).collect(Collectors.toList());
// String idCardsStr = "''" + String.join("','", idCards) + "''";
List < String > idCards = new ArrayList < > ( idCardAndResiInfo . keySet ( ) ) ;
// 1.获取核酸采样信息
String sql = "select id, name,card_no, create_time, realname from hscyxxb where card_no in (:idCards) order by create_time desc" ;
HashMap < String , Object > args = new HashMap < > ( ) ;
args . put ( "idCards" , idCards ) ;
Map < String , Object > args = new HashMap < > ( ) ;
args . put ( "idcards" , idCards ) ;
//log.info("【更新核酸检测信息(from 兰图)】本批次身份证号为:{}", String.join(",", idCards));
// 2.=====================核酸采样=========================
// 这一批居民的核酸采样列表
List < Map < String , Object > > hscyList = yantaiJdbcTemplate . queryForList ( sql , args ) ;
List < Map < String , Object > > hscyList = yantaiNamedParamLantuJdbcTemplate . queryForList (
"select id, name,card_no, create_time, realname from hscyxxb where card_no in (:idcards)" , args ) ;
if ( CollectionUtils . isNotEmpty ( hscyList ) ) {
List < IcNatEntity > entities = new ArrayList < > ( ) ;
hscyList . forEach ( sample Info - > {
hscyList . forEach ( resiHscy Info - > {
// 从视图中获取到的核酸采样相关信息
String name = ( String ) sample Info. get ( "name" ) ;
String cardNo = ( String ) sample Info. get ( "card_no" ) ;
Date createTime = ( Date ) sample Info. get ( "create_time" ) ;
String name = ( String ) resiHscy Info. get ( "name" ) ;
String cardNo = ( String ) resiHscy Info. get ( "card_no" ) ;
Date createTime = ( Date ) resiHscy Info. get ( "create_time" ) ;
// 本地数据库中,居民信息
NatUserInfoResultDTO currentResiInfo = idCardAndResiInfo . get ( cardNo ) ;
@ -828,7 +1116,6 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
e . setName ( StringUtils . isNotBlank ( name ) ? name : "" ) ;
e . setIdCard ( StringUtils . isNotBlank ( cardNo ) ? cardNo : "" ) ;
e . setSampleTime ( createTime ) ;
// e.setSampleTime(DateUtils.parseDate(createTime, DateUtils.DATE_TIME_PATTERN));
e . setAgencyId ( currentResiInfo . getAgencyId ( ) ) ;
e . setPids ( currentResiInfo . getPids ( ) ) ;
e . setAttachmentType ( "" ) ;
@ -840,5 +1127,85 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
sampleAndNat ( existSampleInfo , entities , NumConstant . ONE_STR , customerId , isSync ) ;
}
}
// 2.=====================核酸采样=========================
// 这一批居民的核酸采样列表
List < Map < String , Object > > hsjcResultList = yantaiNamedParamLantuJdbcTemplate . queryForList (
"select name, telephone, card_no, test_time, SAMPLE_TIME, SAMPLE_RESULT_PCR, SAMPLING_ORG_PCR from hsjcxxb where card_no in (:idcards)" , args ) ;
if ( CollectionUtils . isNotEmpty ( hsjcResultList ) ) {
List < IcNatEntity > entities = new ArrayList < > ( ) ;
hsjcResultList . forEach ( natResult - > {
// 从视图中获取到的核酸采样相关信息
String name = ( String ) natResult . get ( "name" ) ;
String cardNo = ( String ) natResult . get ( "card_no" ) ;
Date testTime = ( Date ) natResult . get ( "test_time" ) ;
String telephone = ( String ) natResult . get ( "telephone" ) ;
Date sampleTime = ( Date ) natResult . get ( "SAMPLE_TIME" ) ;
String sampleResultPcr = ( String ) natResult . get ( "SAMPLE_RESULT_PCR" ) ;
String samplingOrgPcr = ( String ) natResult . get ( "SAMPLING_ORG_PCR" ) ;
// 本地数据库中,居民信息
NatUserInfoResultDTO currentResiInfo = idCardAndResiInfo . get ( cardNo ) ;
IcNatEntity e = new IcNatEntity ( ) ;
e . setCustomerId ( customerId ) ;
e . setIsResiUser ( StringUtils . isBlank ( currentResiInfo . getUserId ( ) ) ? NumConstant . ZERO_STR : NumConstant . ONE_STR ) ;
e . setUserId ( currentResiInfo . getUserId ( ) ) ;
e . setUserType ( isSync . equals ( NumConstant . ONE_STR ) ? "manualSync" : "sync" ) ;
e . setName ( StringUtils . isNotBlank ( name ) ? name : "" ) ;
e . setMobile ( StringUtils . isNotBlank ( telephone ) ? telephone : "" ) ;
e . setIdCard ( StringUtils . isNotBlank ( cardNo ) ? cardNo : "" ) ;
e . setNatTime ( testTime ) ;
e . setSampleTime ( sampleTime ) ;
String resultPcr = sampleResultPcr ;
//检测结果 转换 我们 0:阴性 1:阳性, 他们 :1:阳性,2:阴性
e . setNatResult ( NumConstant . ZERO_STR ) ;
if ( NumConstant . ONE_STR . equals ( resultPcr ) ) {
e . setNatResult ( NumConstant . ONE_STR ) ;
}
e . setNatAddress ( samplingOrgPcr ) ;
e . setAgencyId ( currentResiInfo . getAgencyId ( ) ) ;
e . setPids ( currentResiInfo . getPids ( ) ) ;
e . setAttachmentType ( "" ) ;
e . setAttachmentUrl ( "" ) ;
entities . add ( e ) ;
} ) ;
if ( CollectionUtils . isNotEmpty ( entities ) ) {
List < NatUserInfoResultDTO > existNatInfos = icNatDao . getExistNatInfo ( entities ) ;
sampleAndNat ( existNatInfos , entities , NumConstant . TWO_STR , customerId , isSync ) ;
}
}
}
/ * *
* 更新居民核酸检测信息 ( 通过任务处理器 )
* @author wxz
* @date 2022 / 11 / 8 下午8 : 17
* @param jobEntity
* /
public void execSyncByJobProcessor ( IcSyncJobEntity jobEntity ) {
DataSyncTaskParam p = new DataSyncTaskParam ( ) ;
// 正常状态
p . setResiStatus ( "0" ) ;
// 指定组织
p . setAgencyId ( jobEntity . getOrgIdPath ( ) ) ;
List < NatUserInfoResultDTO > resis = null ;
int pageNo = 1 ;
int pageSize = 1000 ;
int updatedResiCount = 0 ;
log . info ( "【任务处理器同步数据】组织Id:{},开始同步数据,同步类型:{}" , jobEntity . getOrgId ( ) , jobEntity . getJobType ( ) ) ;
do {
// 分页,一次查询1000居民,循环更新他们的核酸检测信息
resis = getNatUserInfoFromDb ( p , pageNo , pageSize ) ;
if ( CollectionUtils . isNotEmpty ( resis ) ) {
yantaiHsjcByDbView ( resis , jobEntity . getCustomerId ( ) , NumConstant . ONE_STR ) ;
pageNo + + ;
updatedResiCount + = resis . size ( ) ;
}
} while ( CollectionUtils . isNotEmpty ( resis ) ) ;
log . info ( "【任务处理器同步数据】组织Id:{},同步类型:{},已完成居民数:{}" , jobEntity . getOrgId ( ) , jobEntity . getJobType ( ) , updatedResiCount ) ;
}
}