|
@ -43,6 +43,8 @@ import org.springframework.stereotype.Service; |
|
|
import org.springframework.transaction.annotation.Transactional; |
|
|
import org.springframework.transaction.annotation.Transactional; |
|
|
|
|
|
|
|
|
import java.util.*; |
|
|
import java.util.*; |
|
|
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
import java.util.stream.Collectors; |
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
@ -67,6 +69,8 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
private DataSyncRecordDeathService dataSyncRecordDeathService; |
|
|
private DataSyncRecordDeathService dataSyncRecordDeathService; |
|
|
@Autowired |
|
|
@Autowired |
|
|
private DataSyncRecordDisabilityService dataSyncRecordDisabilityService; |
|
|
private DataSyncRecordDisabilityService dataSyncRecordDisabilityService; |
|
|
|
|
|
@Autowired |
|
|
|
|
|
private ExecutorService executorService; |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public DataSyncConfigDTO get(String id) { |
|
|
public DataSyncConfigDTO get(String id) { |
|
@ -184,21 +188,115 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
if (StringUtils.isNotBlank(formDTO.getDataCode())){ |
|
|
if (StringUtils.isNotBlank(formDTO.getDataCode())){ |
|
|
|
|
|
List<DataSyncConfigDTO> newConfigList = new ArrayList<>(); |
|
|
allConfigList.forEach(c -> { |
|
|
allConfigList.forEach(c -> { |
|
|
if (c.getDataCode().equals(formDTO.getDataCode())){ |
|
|
if (c.getDataCode().equals(formDTO.getDataCode())){ |
|
|
disAllData(c,formDTO); |
|
|
//串行执行
|
|
|
return; |
|
|
//disAllData(c,formDTO);
|
|
|
|
|
|
|
|
|
|
|
|
//并行执行
|
|
|
|
|
|
newConfigList.add(c); |
|
|
} |
|
|
} |
|
|
}); |
|
|
}); |
|
|
|
|
|
//死亡 残疾等接口 并行执行
|
|
|
|
|
|
dataSyncYanTaiParallel(newConfigList,formDTO); |
|
|
}else { |
|
|
}else { |
|
|
for (DataSyncConfigDTO config : allConfigList) { |
|
|
//死亡 残疾等接口 并行执行
|
|
|
//没有配置 数据拉取范围 继续下次循环
|
|
|
dataSyncYanTaiParallel(allConfigList,formDTO); |
|
|
if (CollectionUtils.isEmpty(config.getScopeList())) { |
|
|
//串行执行
|
|
|
|
|
|
// for (DataSyncConfigDTO config : allConfigList) {
|
|
|
|
|
|
// //没有配置 数据拉取范围 继续下次循环
|
|
|
|
|
|
// if (CollectionUtils.isEmpty(config.getScopeList())) {
|
|
|
|
|
|
// continue;
|
|
|
|
|
|
// }
|
|
|
|
|
|
// disAllData(config,formDTO);
|
|
|
|
|
|
// }
|
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void dataSyncYanTaiParallel(List<DataSyncConfigDTO> configList, DataSyncTaskParam formDTO) { |
|
|
|
|
|
if (CollectionUtils.isEmpty(configList)) { |
|
|
|
|
|
log.warn("dataSyncYanTaiParallel configList is null"); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
long count = configList.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count(); |
|
|
|
|
|
if (count < 1) { |
|
|
|
|
|
log.warn("dataSyncYanTaiParallel scopeList is null"); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
//没传具体参数 则 按照
|
|
|
|
|
|
int pageNo = NumConstant.ONE; |
|
|
|
|
|
int pageSize = NumConstant.ONE_THOUSAND; |
|
|
|
|
|
|
|
|
|
|
|
List<NatUserInfoResultDTO> dbResiList = null; |
|
|
|
|
|
CountDownLatch countDownLatch = new CountDownLatch((int) count); |
|
|
|
|
|
|
|
|
|
|
|
do { |
|
|
|
|
|
for (DataSyncConfigDTO config : configList) {//设置查询数据范围
|
|
|
|
|
|
formDTO.setOrgList(config.getScopeList()); |
|
|
|
|
|
DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode()); |
|
|
|
|
|
|
|
|
|
|
|
//查询正常状态的居民 并回显 残疾状态
|
|
|
|
|
|
formDTO.setCategoryColumn("IS_CJ"); |
|
|
|
|
|
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize); |
|
|
|
|
|
if (CollectionUtils.isEmpty(dbResiList)) { |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
|
disAllData(config,formDTO); |
|
|
List<NatUserInfoResultDTO> finalDbResiList = dbResiList; |
|
|
|
|
|
switch (anEnum) { |
|
|
|
|
|
case HE_SUAN: |
|
|
|
|
|
executorService.submit(() -> { |
|
|
|
|
|
try { |
|
|
|
|
|
//查询正常状态的居民
|
|
|
|
|
|
hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync()); |
|
|
|
|
|
log.info("======核酸检测信息拉取结束======"); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("hsjc thread execute exception", e); |
|
|
|
|
|
} finally { |
|
|
|
|
|
countDownLatch.countDown(); |
|
|
|
|
|
} |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
break; |
|
|
|
|
|
case CAN_JI: |
|
|
|
|
|
executorService.submit(() -> { |
|
|
|
|
|
try { |
|
|
|
|
|
//查询正常状态的居民
|
|
|
|
|
|
canJi(finalDbResiList); |
|
|
|
|
|
log.info("======canJi信息拉取结束======"); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("hsjc thread execute exception", e); |
|
|
|
|
|
} finally { |
|
|
|
|
|
countDownLatch.countDown(); |
|
|
|
|
|
} |
|
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
break; |
|
|
|
|
|
case SI_WANG: |
|
|
|
|
|
try { |
|
|
|
|
|
//查询正常状态的居民
|
|
|
|
|
|
siWang(finalDbResiList); |
|
|
|
|
|
log.info("======siWang信息拉取结束======"); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("hsjc thread execute exception", e); |
|
|
|
|
|
} finally { |
|
|
|
|
|
countDownLatch.countDown(); |
|
|
|
|
|
} |
|
|
|
|
|
break; |
|
|
|
|
|
default: |
|
|
|
|
|
log.warn("没有要处理的数据"); |
|
|
|
|
|
} |
|
|
|
|
|
try { |
|
|
|
|
|
//等这一批居民 各个方法都执行完后 再继续下一批
|
|
|
|
|
|
countDownLatch.await(); |
|
|
|
|
|
} catch (InterruptedException e) { |
|
|
|
|
|
log.error("dataSyncYanTaiParallel现成被中断"); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
pageNo++; |
|
|
|
|
|
} while (dbResiList != null && dbResiList.size() == pageSize); |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private void disAllData(DataSyncConfigDTO config,DataSyncTaskParam formDTO){ |
|
|
private void disAllData(DataSyncConfigDTO config,DataSyncTaskParam formDTO){ |
|
@ -312,7 +410,6 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
* desc:从数据库获取居民信息 |
|
|
* desc:从数据库获取居民信息 |
|
|
* |
|
|
* |
|
|
* @param formDTO |
|
|
* @param formDTO |
|
|
* @param anEnum |
|
|
|
|
|
* @param pageNo |
|
|
* @param pageNo |
|
|
* @param pageSize |
|
|
* @param pageSize |
|
|
* @return |
|
|
* @return |
|
@ -321,23 +418,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao |
|
|
//根据 组织 分页获取 居民数据
|
|
|
//根据 组织 分页获取 居民数据
|
|
|
PageInfo<NatUserInfoResultDTO> pageInfo = PageHelper.startPage(pageNo, pageSize, false) |
|
|
PageInfo<NatUserInfoResultDTO> pageInfo = PageHelper.startPage(pageNo, pageSize, false) |
|
|
.doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO)); |
|
|
.doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO)); |
|
|
List<NatUserInfoResultDTO> dbResiList; |
|
|
return pageInfo.getList(); |
|
|
dbResiList = pageInfo.getList(); |
|
|
|
|
|
/* //如果传了身份证号 则按照身份证号查询 并同步记录, userId如果为空则是 手动录入的 此人没有录入居民库 但是也可以同步
|
|
|
|
|
|
if (CollectionUtils.isNotEmpty(formDTO.getIdCards()) && DataSyncEnum.HE_SUAN.getCode().equals(anEnum.getCode())) { |
|
|
|
|
|
List<NatUserInfoResultDTO> collect = formDTO.getIdCards().stream().map(id -> { |
|
|
|
|
|
NatUserInfoResultDTO e = new NatUserInfoResultDTO(); |
|
|
|
|
|
e.setIdCard(id); |
|
|
|
|
|
e.setUserId(""); |
|
|
|
|
|
return e; |
|
|
|
|
|
}).collect(Collectors.toList()); |
|
|
|
|
|
|
|
|
|
|
|
for (NatUserInfoResultDTO c : collect) { |
|
|
|
|
|
dbResiList.stream().filter(u -> u.getIdCard().equals(c.getIdCard())).forEach(u -> c.setUserId(u.getUserId())); |
|
|
|
|
|
} |
|
|
|
|
|
dbResiList = collect; |
|
|
|
|
|
}*/ |
|
|
|
|
|
return dbResiList; |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private void canJi(List<NatUserInfoResultDTO> resiList) { |
|
|
private void canJi(List<NatUserInfoResultDTO> resiList) { |
|
|