|
|
|
@ -11,7 +11,6 @@ import com.epmet.commons.tools.constant.NumConstant; |
|
|
|
import com.epmet.commons.tools.constant.StrConstant; |
|
|
|
import com.epmet.commons.tools.distributedlock.DistributedLock; |
|
|
|
import com.epmet.commons.tools.dto.form.PageFormDTO; |
|
|
|
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult; |
|
|
|
import com.epmet.commons.tools.dto.result.YtDataSyncResDTO; |
|
|
|
import com.epmet.commons.tools.dto.result.YtHscyResDTO; |
|
|
|
import com.epmet.commons.tools.dto.result.YtHsjcResDTO; |
|
|
|
@ -21,7 +20,6 @@ import com.epmet.commons.tools.exception.EpmetException; |
|
|
|
import com.epmet.commons.tools.exception.ExceptionUtils; |
|
|
|
import com.epmet.commons.tools.page.PageData; |
|
|
|
import com.epmet.commons.tools.redis.common.CustomerOrgRedis; |
|
|
|
import com.epmet.commons.tools.redis.common.CustomerStaffRedis; |
|
|
|
import com.epmet.commons.tools.redis.common.bean.AgencyInfoCache; |
|
|
|
import com.epmet.commons.tools.redis.common.bean.GridInfoCache; |
|
|
|
import com.epmet.commons.tools.security.dto.TokenDto; |
|
|
|
@ -32,9 +30,11 @@ import com.epmet.constant.EpidemicConstant; |
|
|
|
import com.epmet.dao.DataSyncConfigDao; |
|
|
|
import com.epmet.dao.IcNatDao; |
|
|
|
import com.epmet.dao.IcSyncJobDao; |
|
|
|
import com.epmet.dao.IcVaccineDao; |
|
|
|
import com.epmet.dto.DataSyncConfigDTO; |
|
|
|
import com.epmet.dto.DataSyncRecordDeathDTO; |
|
|
|
import com.epmet.dto.DataSyncRecordDisabilityDTO; |
|
|
|
import com.epmet.dto.YTVaccineListDTO; |
|
|
|
import com.epmet.dto.form.ConfigSwitchFormDTO; |
|
|
|
import com.epmet.dto.form.DataSyncTaskParam; |
|
|
|
import com.epmet.dto.form.ScopeSaveFormDTO; |
|
|
|
@ -48,7 +48,6 @@ import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.commons.collections4.CollectionUtils; |
|
|
|
import org.apache.commons.collections4.ListUtils; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.redisson.api.RLock; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.jdbc.core.JdbcTemplate; |
|
|
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; |
|
|
|
@ -59,7 +58,6 @@ import javax.annotation.Resource; |
|
|
|
import java.util.*; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.function.Function; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@ -95,6 +93,12 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
private IcSyncJobDao icSyncJobDao; |
|
|
|
@Autowired |
|
|
|
private DistributedLock distributedLock; |
|
|
|
@Autowired |
|
|
|
private IcVaccineService icVaccineService; |
|
|
|
@Autowired |
|
|
|
private IcVaccineRelationService icVaccineRelationService; |
|
|
|
@Autowired |
|
|
|
private IcVaccineDao icVaccineDao; |
|
|
|
|
|
|
|
@Resource(name = "yantaiNamedParamLantuJdbcTemplate") |
|
|
|
private NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate; |
|
|
|
@ -378,7 +382,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
case HE_SUAN: |
|
|
|
try { |
|
|
|
//查询正常状态的居民
|
|
|
|
yantaiHsjcByDbView(dbResiList, config.getCustomerId(), formDTO.getIsSync()); |
|
|
|
yanTaiDbViewByType(dbResiList, config.getCustomerId(), formDTO.getIsSync(), JOB_TYPE_NAT); |
|
|
|
log.info("======核酸检测信息拉取结束======"); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("nat thread execute exception", e); |
|
|
|
@ -393,7 +397,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
if (CollectionUtils.isEmpty(dbResiList)){ |
|
|
|
return; |
|
|
|
} |
|
|
|
yantaiHsjcByDbView(dbResiList, dbResiList.get(NumConstant.ZERO).getCustomerId(), formDTO.getIsSync()); |
|
|
|
yanTaiDbViewByType(dbResiList, dbResiList.get(NumConstant.ZERO).getCustomerId(), formDTO.getIsSync(),JOB_TYPE_NAT); |
|
|
|
} |
|
|
|
pageNo++; |
|
|
|
} while (dbResiList != null && dbResiList.size() == pageSize); |
|
|
|
@ -408,10 +412,6 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void natInfoSyncButton(DataSyncTaskParam formDTO) { |
|
|
|
CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis.getStaffInfo(formDTO.getCustomerId(), formDTO.getStaffId()); |
|
|
|
if (null == staffInfo){ |
|
|
|
throw new EpmetException("未查询到工作人员信息:"+formDTO.getStaffId()); |
|
|
|
} |
|
|
|
AgencyInfoCache agencyInfo = CustomerOrgRedis.getAgencyInfo(formDTO.getAgencyId()); |
|
|
|
if (null == agencyInfo){ |
|
|
|
throw new EpmetException("未查询到组织信息:"+formDTO.getAgencyId()); |
|
|
|
@ -420,6 +420,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
// 查询该组织是否存在等待中或者进行中的任务
|
|
|
|
LambdaQueryWrapper<IcSyncJobEntity> qw = new LambdaQueryWrapper<>(); |
|
|
|
qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId()) |
|
|
|
.eq(IcSyncJobEntity::getJobType,formDTO.getJobType()) |
|
|
|
.in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING); |
|
|
|
List<IcSyncJobEntity> icSyncJobEntities = icSyncJobDao.selectList(qw); |
|
|
|
// 当前组织下存在同步任务
|
|
|
|
@ -433,43 +434,10 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
e.setOrgId(formDTO.getAgencyId()); |
|
|
|
e.setPid(agencyInfo.getPid()); |
|
|
|
e.setOrgIdPath(StringUtils.isBlank(agencyInfo.getPids()) ? agencyInfo.getId() : agencyInfo.getPids()+":"+agencyInfo.getId()); |
|
|
|
e.setJobType(JOB_TYPE_NAT); |
|
|
|
e.setJobType(formDTO.getJobType()); |
|
|
|
e.setOperatorId(formDTO.getStaffId()); |
|
|
|
e.setOperationStatus(OPERATION_STATUS_WAITING); |
|
|
|
insertSync(e); |
|
|
|
|
|
|
|
//List<IcSyncJobEntity> waitList;
|
|
|
|
//do {
|
|
|
|
// LambdaQueryWrapper<IcSyncJobEntity> qw3 = new LambdaQueryWrapper<>();
|
|
|
|
// qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId())
|
|
|
|
// .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING)
|
|
|
|
// .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT)
|
|
|
|
// .orderByAsc(BaseEpmetEntity::getCreatedTime);
|
|
|
|
// waitList = icSyncJobDao.selectList(qw3);
|
|
|
|
// if (CollectionUtils.isNotEmpty(waitList)){
|
|
|
|
// for (IcSyncJobEntity entity : waitList) {
|
|
|
|
// RLock lock = null;
|
|
|
|
// try {
|
|
|
|
// lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
|
|
|
|
// updateSync(entity.getId(),OPERATION_STATUS_PROCESSING);
|
|
|
|
// }catch (Exception ex){
|
|
|
|
// log.error(ex.getMessage());
|
|
|
|
// throw new EpmetException(ex.getMessage());
|
|
|
|
// }finally {
|
|
|
|
// lock.unlock();
|
|
|
|
// }
|
|
|
|
// formDTO.setAgencyId(entity.getOrgId());
|
|
|
|
// try {
|
|
|
|
// natInfoScanTask(formDTO);
|
|
|
|
// }catch (Exception ee){
|
|
|
|
// log.error(ee.getMessage());
|
|
|
|
// throw new EpmetException(ee.getMessage());
|
|
|
|
// }finally {
|
|
|
|
// updateSync(entity.getId(),OPERATION_STATUS_FINISH);
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//}while (CollectionUtils.isNotEmpty(waitList));
|
|
|
|
} |
|
|
|
|
|
|
|
@Transactional(rollbackFor = Exception.class) |
|
|
|
@ -532,7 +500,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
try { |
|
|
|
//查询正常状态的居民
|
|
|
|
//hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
|
|
|
|
yantaiHsjcByDbView(finalDbResiList, config.getCustomerId(), formDTO.getIsSync()); |
|
|
|
yanTaiDbViewByType(finalDbResiList, config.getCustomerId(), formDTO.getIsSync(), JOB_TYPE_NAT); |
|
|
|
log.info("======核酸检测信息拉取结束======"); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("hsjc thread execute exception", e); |
|
|
|
@ -1062,15 +1030,110 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
* @param customerId |
|
|
|
* @param isSync |
|
|
|
*/ |
|
|
|
public void yantaiHsjcByDbView(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync) { |
|
|
|
List<List<NatUserInfoResultDTO>> resiInfobatchs = ListUtils.partition(resiInfos, 50); |
|
|
|
for (List<NatUserInfoResultDTO> resibatch : resiInfobatchs) { |
|
|
|
public void yanTaiDbViewByType(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync, String jobType) { |
|
|
|
List<List<NatUserInfoResultDTO>> resiInfoBatchs = ListUtils.partition(resiInfos, 50); |
|
|
|
for (List<NatUserInfoResultDTO> resiBatch : resiInfoBatchs) { |
|
|
|
// n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。
|
|
|
|
try { |
|
|
|
yantaiHsjcByDbViewPartition(resibatch, customerId, isSync); |
|
|
|
switch (jobType){ |
|
|
|
// 核酸检测
|
|
|
|
case JOB_TYPE_NAT: |
|
|
|
yantaiHsjcByDbViewPartition(resiBatch, customerId, isSync); |
|
|
|
break; |
|
|
|
// 疫苗接种
|
|
|
|
case JOB_TYPE_VACCINE: |
|
|
|
yanTaiVaccineByDbViewPartition(resiBatch, customerId, isSync); |
|
|
|
break; |
|
|
|
default: |
|
|
|
break; |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
String errorMsg = ExceptionUtils.getErrorStackTrace(e); |
|
|
|
log.error("【更新核酸检测信息(from 兰图)】失败,信息:{}", errorMsg); |
|
|
|
log.error("【更新{}信息(from 兰图)】失败,信息:{}", jobType, errorMsg); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* @Description 疫苗接种信息处理 |
|
|
|
* @param resiInfos |
|
|
|
* @param customerId |
|
|
|
* @param isSync |
|
|
|
* @Author zxc |
|
|
|
* @Date 2022/11/11 11:07 |
|
|
|
*/ |
|
|
|
public void yanTaiVaccineByDbViewPartition(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync){ |
|
|
|
// 将居民信息转化为<idCard,resiInfo>的map
|
|
|
|
Map<String, NatUserInfoResultDTO> idCardAndResiInfoMap = resiInfos.stream().collect(Collectors.toMap(resi -> resi.getIdCard(), Function.identity())); |
|
|
|
List<String> idCards = new ArrayList<>(idCardAndResiInfoMap.keySet()); |
|
|
|
// 1.获取核酸采样信息
|
|
|
|
Map<String, Object> args = new HashMap<>(); |
|
|
|
args.put("idcards", idCards); |
|
|
|
// todo 疫苗接种视图
|
|
|
|
String json = "[\n" + |
|
|
|
"{\n" + |
|
|
|
"\t\"idCard\":\"370785195001012558\",\n" + |
|
|
|
"\t\"inoculateDate\":\"2023-01-01 10:01\",\n" + |
|
|
|
"\t\"lastStationName\":\"北京\"\n" + |
|
|
|
"},\n" + |
|
|
|
"{\n" + |
|
|
|
"\t\"idCard\":\"370785195001012558\",\n" + |
|
|
|
"\t\"inoculateDate\":\"2022-01-01 10:01\",\n" + |
|
|
|
"\t\"lastStationName\":\"南京\"\n" + |
|
|
|
"}\n" + |
|
|
|
"]"; |
|
|
|
List<Map<String, Object>> vaccineList = new ArrayList<>();/*yantaiNamedParamLantuJdbcTemplate.queryForList( |
|
|
|
"select id, name,card_no, create_time from hscyxxb where card_no in (:idcards)", args);*/ |
|
|
|
Map<String, Object> m = new HashMap<>(); |
|
|
|
vaccineList.add(m); |
|
|
|
if (CollectionUtils.isNotEmpty(vaccineList)) { |
|
|
|
List<IcVaccineEntity> entities = new ArrayList<>(); |
|
|
|
// List<YTVaccineListDTO> ytVaccineListDTOS = ConvertUtils.sourceToTarget(vaccineList, YTVaccineListDTO.class);
|
|
|
|
List<YTVaccineListDTO> ytVaccineListDTOS = JSON.parseArray(json,YTVaccineListDTO.class); |
|
|
|
List<YTVaccineListDTO> existVaccine = icVaccineDao.getExistVaccine(ytVaccineListDTOS); |
|
|
|
if (CollectionUtils.isNotEmpty(existVaccine)){ |
|
|
|
for (YTVaccineListDTO e : existVaccine) { |
|
|
|
for (int i = 0; i < ytVaccineListDTOS.size(); i++) { |
|
|
|
if (ytVaccineListDTOS.get(i).getIdCard().equals(e.getIdCard()) && ytVaccineListDTOS.get(i).getInoculateDate().equals(e.getInoculateDate())){ |
|
|
|
ytVaccineListDTOS.remove(i); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (CollectionUtils.isNotEmpty(ytVaccineListDTOS)){ |
|
|
|
ytVaccineListDTOS.forEach(v -> { |
|
|
|
resiInfos.forEach(u -> { |
|
|
|
if (v.getIdCard().equals(u.getIdCard())){ |
|
|
|
IcVaccineEntity e = new IcVaccineEntity(); |
|
|
|
e.setCustomerId(customerId); |
|
|
|
e.setName(u.getName()); |
|
|
|
e.setMobile(u.getMobile()); |
|
|
|
e.setIdCard(u.getIdCard()); |
|
|
|
e.setIsResiUser(StringUtils.isBlank(u.getUserId()) ? NumConstant.ZERO_STR : NumConstant.ONE_STR); |
|
|
|
e.setUserId(StringUtils.isBlank(u.getUserId()) ? "" : u.getUserId()); |
|
|
|
e.setUserType("ytPull"); |
|
|
|
e.setInoculateTime(DateUtils.parseDate(v.getInoculateDate(),DateUtils.DATE_TIME_PATTERN_END_WITH_MINUTE)); |
|
|
|
e.setInoculateAddress(v.getLastStationName()); |
|
|
|
e.setAgencyId(u.getAgencyId()); |
|
|
|
e.setPids(u.getPids()); |
|
|
|
entities.add(e); |
|
|
|
} |
|
|
|
}); |
|
|
|
}); |
|
|
|
icVaccineService.insertBatch(entities,NumConstant.ONE_HUNDRED); |
|
|
|
List<IcVaccineRelationEntity> relationEntities = new ArrayList<>(); |
|
|
|
entities.forEach(e -> { |
|
|
|
IcVaccineRelationEntity re = new IcVaccineRelationEntity(); |
|
|
|
re.setIcVaccineId(e.getAgencyId()); |
|
|
|
re.setPids(e.getPids()); |
|
|
|
re.setCustomerId(customerId); |
|
|
|
re.setUserType("ytPull"); |
|
|
|
re.setAgencyId(e.getAgencyId()); |
|
|
|
// 拉取居民的疫苗接种信;因为查询的就是组织下的居民,所以都是本地居民。
|
|
|
|
re.setIsLocalResiUser(NumConstant.ONE_STR); |
|
|
|
relationEntities.add(re); |
|
|
|
}); |
|
|
|
icVaccineRelationService.insertBatch(relationEntities,NumConstant.ONE_HUNDRED); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -1197,14 +1260,62 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
|
int updatedResiCount = 0; |
|
|
|
log.info("【任务处理器同步数据】组织Id:{},开始同步数据,同步类型:{}", jobEntity.getOrgId(), jobEntity.getJobType()); |
|
|
|
do { |
|
|
|
// 分页,一次查询1000居民,循环更新他们的核酸检测信息
|
|
|
|
// 分页,一次查询 1000 居民,循环更新他们的核酸检测信息
|
|
|
|
resis = getNatUserInfoFromDb(p, pageNo, pageSize); |
|
|
|
if (CollectionUtils.isNotEmpty(resis)) { |
|
|
|
yantaiHsjcByDbView(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR); |
|
|
|
yanTaiDbViewByType(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR, jobEntity.getJobType()); |
|
|
|
pageNo++; |
|
|
|
updatedResiCount += resis.size(); |
|
|
|
} |
|
|
|
} while (CollectionUtils.isNotEmpty(resis)); |
|
|
|
log.info("【任务处理器同步数据】组织Id:{},同步类型:{},已完成居民数:{}", jobEntity.getOrgId(), jobEntity.getJobType(), updatedResiCount); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* @Description 疫苗接种信息定时拉取 |
|
|
|
* @param formDTO |
|
|
|
* @Author zxc |
|
|
|
* @Date 2022/11/11 16:56 |
|
|
|
*/ |
|
|
|
@Override |
|
|
|
public void vaccineInfoScanTask(DataSyncTaskParam formDTO) { |
|
|
|
List<DataSyncConfigDTO> configData = getConfigData(null, EpidemicConstant.DATA_CODE_VACCINE); |
|
|
|
if (CollectionUtils.isEmpty(configData)){ |
|
|
|
log.warn("vaccineInfoScanTask not exists config data "); |
|
|
|
return; |
|
|
|
} |
|
|
|
long count = configData.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count(); |
|
|
|
if (count < 1) { |
|
|
|
log.warn("vaccineInfoScanTask scopeList is null"); |
|
|
|
return; |
|
|
|
} |
|
|
|
int pageNo = NumConstant.ONE; |
|
|
|
int pageSize = NumConstant.ONE_THOUSAND; |
|
|
|
List<NatUserInfoResultDTO> dbResiList = null; |
|
|
|
do { |
|
|
|
for (DataSyncConfigDTO config : configData) { |
|
|
|
// 设置查询数据范围
|
|
|
|
formDTO.setOrgList(config.getScopeList()); |
|
|
|
DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode()); |
|
|
|
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize); |
|
|
|
if (CollectionUtils.isEmpty(dbResiList)) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
switch (anEnum) { |
|
|
|
case VACCINE: |
|
|
|
try { |
|
|
|
//查询正常状态的居民
|
|
|
|
yanTaiVaccineByDbViewPartition(dbResiList,config.getCustomerId(),NumConstant.ZERO_STR); |
|
|
|
log.info("======vaccine信息拉取结束======"); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("vaccine thread execute exception", e); |
|
|
|
} |
|
|
|
break; |
|
|
|
default: |
|
|
|
log.warn("没有要处理的数据"); |
|
|
|
} |
|
|
|
} |
|
|
|
pageNo++; |
|
|
|
} while (dbResiList != null && dbResiList.size() == pageSize); |
|
|
|
} |
|
|
|
} |
|
|
|
|