Browse Source

同步数据改为并行处理

dev
jianjun 3 years ago
parent
commit
766e218e03
  1. 129
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

129
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

@ -43,6 +43,8 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/**
@ -67,6 +69,8 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
private DataSyncRecordDeathService dataSyncRecordDeathService;
@Autowired
private DataSyncRecordDisabilityService dataSyncRecordDisabilityService;
@Autowired
private ExecutorService executorService;
@Override
public DataSyncConfigDTO get(String id) {
@ -184,21 +188,115 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
return;
}
if (StringUtils.isNotBlank(formDTO.getDataCode())){
List<DataSyncConfigDTO> newConfigList = new ArrayList<>();
allConfigList.forEach(c -> {
if (c.getDataCode().equals(formDTO.getDataCode())){
disAllData(c,formDTO);
return;
//串行执行
//disAllData(c,formDTO);
//并行执行
newConfigList.add(c);
}
});
//死亡 残疾等接口 并行执行
dataSyncYanTaiParallel(newConfigList,formDTO);
}else {
for (DataSyncConfigDTO config : allConfigList) {
//没有配置 数据拉取范围 继续下次循环
if (CollectionUtils.isEmpty(config.getScopeList())) {
//死亡 残疾等接口 并行执行
dataSyncYanTaiParallel(allConfigList,formDTO);
//串行执行
// for (DataSyncConfigDTO config : allConfigList) {
// //没有配置 数据拉取范围 继续下次循环
// if (CollectionUtils.isEmpty(config.getScopeList())) {
// continue;
// }
// disAllData(config,formDTO);
// }
}
}
private void dataSyncYanTaiParallel(List<DataSyncConfigDTO> configList, DataSyncTaskParam formDTO) {
if (CollectionUtils.isEmpty(configList)) {
log.warn("dataSyncYanTaiParallel configList is null");
return;
}
long count = configList.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count();
if (count < 1) {
log.warn("dataSyncYanTaiParallel scopeList is null");
return;
}
//没传具体参数 则 按照
int pageNo = NumConstant.ONE;
int pageSize = NumConstant.ONE_THOUSAND;
List<NatUserInfoResultDTO> dbResiList = null;
CountDownLatch countDownLatch = new CountDownLatch((int) count);
do {
for (DataSyncConfigDTO config : configList) {//设置查询数据范围
formDTO.setOrgList(config.getScopeList());
DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode());
//查询正常状态的居民 并回显 残疾状态
formDTO.setCategoryColumn("IS_CJ");
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize);
if (CollectionUtils.isEmpty(dbResiList)) {
continue;
}
disAllData(config,formDTO);
List<NatUserInfoResultDTO> finalDbResiList = dbResiList;
switch (anEnum) {
case HE_SUAN:
executorService.submit(() -> {
try {
//查询正常状态的居民
hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
log.info("======核酸检测信息拉取结束======");
} catch (Exception e) {
log.error("hsjc thread execute exception", e);
} finally {
countDownLatch.countDown();
}
});
break;
case CAN_JI:
executorService.submit(() -> {
try {
//查询正常状态的居民
canJi(finalDbResiList);
log.info("======canJi信息拉取结束======");
} catch (Exception e) {
log.error("hsjc thread execute exception", e);
} finally {
countDownLatch.countDown();
}
});
break;
case SI_WANG:
try {
//查询正常状态的居民
siWang(finalDbResiList);
log.info("======siWang信息拉取结束======");
} catch (Exception e) {
log.error("hsjc thread execute exception", e);
} finally {
countDownLatch.countDown();
}
break;
default:
log.warn("没有要处理的数据");
}
try {
//等这一批居民 各个方法都执行完后 再继续下一批
countDownLatch.await();
} catch (InterruptedException e) {
log.error("dataSyncYanTaiParallel现成被中断");
}
}
pageNo++;
} while (dbResiList != null && dbResiList.size() == pageSize);
}
private void disAllData(DataSyncConfigDTO config,DataSyncTaskParam formDTO){
@ -312,7 +410,6 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* desc从数据库获取居民信息
*
* @param formDTO
* @param anEnum
* @param pageNo
* @param pageSize
* @return
@ -321,23 +418,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
//根据 组织 分页获取 居民数据
PageInfo<NatUserInfoResultDTO> pageInfo = PageHelper.startPage(pageNo, pageSize, false)
.doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO));
List<NatUserInfoResultDTO> dbResiList;
dbResiList = pageInfo.getList();
/* //如果传了身份证号 则按照身份证号查询 并同步记录, userId如果为空则是 手动录入的 此人没有录入居民库 但是也可以同步
if (CollectionUtils.isNotEmpty(formDTO.getIdCards()) && DataSyncEnum.HE_SUAN.getCode().equals(anEnum.getCode())) {
List<NatUserInfoResultDTO> collect = formDTO.getIdCards().stream().map(id -> {
NatUserInfoResultDTO e = new NatUserInfoResultDTO();
e.setIdCard(id);
e.setUserId("");
return e;
}).collect(Collectors.toList());
for (NatUserInfoResultDTO c : collect) {
dbResiList.stream().filter(u -> u.getIdCard().equals(c.getIdCard())).forEach(u -> c.setUserId(u.getUserId()));
}
dbResiList = collect;
}*/
return dbResiList;
return pageInfo.getList();
}
private void canJi(List<NatUserInfoResultDTO> resiList) {

Loading…
Cancel
Save