From 766e218e03dfaef91785ddcf0d572730209010ec Mon Sep 17 00:00:00 2001 From: jianjun Date: Fri, 28 Oct 2022 13:08:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=B9=B6=E8=A1=8C=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/DataSyncConfigServiceImpl.java | 131 ++++++++++++++---- 1 file changed, 106 insertions(+), 25 deletions(-) diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java index 7fc6aa2bb6..bb72f987a6 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java @@ -43,6 +43,8 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; /** @@ -67,6 +69,8 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl newConfigList = new ArrayList<>(); allConfigList.forEach(c -> { if (c.getDataCode().equals(formDTO.getDataCode())){ - disAllData(c,formDTO); - return; + //串行执行 + //disAllData(c,formDTO); + + //并行执行 + newConfigList.add(c); } }); + //死亡 残疾等接口 并行执行 + dataSyncYanTaiParallel(newConfigList,formDTO); }else { - for (DataSyncConfigDTO config : allConfigList) { - //没有配置 数据拉取范围 继续下次循环 - if (CollectionUtils.isEmpty(config.getScopeList())) { + //死亡 残疾等接口 并行执行 + dataSyncYanTaiParallel(allConfigList,formDTO); + //串行执行 +// for (DataSyncConfigDTO config : allConfigList) { +// //没有配置 数据拉取范围 继续下次循环 +// if (CollectionUtils.isEmpty(config.getScopeList())) { +// continue; +// } +// disAllData(config,formDTO); +// } + } + } + + private void dataSyncYanTaiParallel(List configList, DataSyncTaskParam formDTO) { + if (CollectionUtils.isEmpty(configList)) { + log.warn("dataSyncYanTaiParallel configList is null"); + return; + } + long count = configList.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count(); + if (count < 1) { + log.warn("dataSyncYanTaiParallel scopeList is null"); + return; + } + //没传具体参数 则 按照 + int pageNo = NumConstant.ONE; + int pageSize = NumConstant.ONE_THOUSAND; + + List dbResiList = null; + CountDownLatch countDownLatch = new CountDownLatch((int) count); + + do { + for (DataSyncConfigDTO config : configList) {//设置查询数据范围 + formDTO.setOrgList(config.getScopeList()); + DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode()); + + //查询正常状态的居民 并回显 残疾状态 + formDTO.setCategoryColumn("IS_CJ"); + dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize); + if (CollectionUtils.isEmpty(dbResiList)) { continue; } - disAllData(config,formDTO); + List finalDbResiList = dbResiList; + switch (anEnum) { + case HE_SUAN: + executorService.submit(() -> { + try { + //查询正常状态的居民 + hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync()); + log.info("======核酸检测信息拉取结束======"); + } catch (Exception e) { + log.error("hsjc thread execute exception", e); + } finally { + countDownLatch.countDown(); + } + }); + + break; + case CAN_JI: + executorService.submit(() -> { + try { + //查询正常状态的居民 + canJi(finalDbResiList); + log.info("======canJi信息拉取结束======"); + } catch (Exception e) { + log.error("hsjc thread execute exception", e); + } finally { + countDownLatch.countDown(); + } + }); + + break; + case SI_WANG: + try { + //查询正常状态的居民 + siWang(finalDbResiList); + log.info("======siWang信息拉取结束======"); + } catch (Exception e) { + log.error("hsjc thread execute exception", e); + } finally { + countDownLatch.countDown(); + } + break; + default: + log.warn("没有要处理的数据"); + } + try { + //等这一批居民 各个方法都执行完后 再继续下一批 + countDownLatch.await(); + } catch (InterruptedException e) { + log.error("dataSyncYanTaiParallel现成被中断"); + } } - } + + pageNo++; + } while (dbResiList != null && dbResiList.size() == pageSize); + } private void disAllData(DataSyncConfigDTO config,DataSyncTaskParam formDTO){ @@ -312,7 +410,6 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl pageInfo = PageHelper.startPage(pageNo, pageSize, false) .doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO)); - List dbResiList; - dbResiList = pageInfo.getList(); - /* //如果传了身份证号 则按照身份证号查询 并同步记录, userId如果为空则是 手动录入的 此人没有录入居民库 但是也可以同步 - if (CollectionUtils.isNotEmpty(formDTO.getIdCards()) && DataSyncEnum.HE_SUAN.getCode().equals(anEnum.getCode())) { - List collect = formDTO.getIdCards().stream().map(id -> { - NatUserInfoResultDTO e = new NatUserInfoResultDTO(); - e.setIdCard(id); - e.setUserId(""); - return e; - }).collect(Collectors.toList()); - - for (NatUserInfoResultDTO c : collect) { - dbResiList.stream().filter(u -> u.getIdCard().equals(c.getIdCard())).forEach(u -> c.setUserId(u.getUserId())); - } - dbResiList = collect; - }*/ - return dbResiList; + return pageInfo.getList(); } private void canJi(List resiList) {