diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java index 0ad814d722..388b0d05a5 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java @@ -12,7 +12,13 @@ public interface EpidemicConstant { String DATA_CODE_DISABILITY = "canji"; String DATA_CODE_DEATH = "siwang"; + /** + * ic_sync_job 任务类型 + * 核酸检测 + * 疫苗接种 + */ String JOB_TYPE_NAT = "nat"; + String JOB_TYPE_VACCINE = "vaccine"; String OPERATION_STATUS_WAITING = "waiting"; String OPERATION_STATUS_PROCESSING = "processing"; diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java new file mode 100644 index 0000000000..e53a279a46 --- /dev/null +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java @@ -0,0 +1,112 @@ +package com.epmet.processor; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.constant.EpidemicConstant; +import com.epmet.dao.IcSyncJobDao; +import com.epmet.entity.IcSyncJobEntity; +import com.epmet.service.DataSyncConfigService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT; +import static com.epmet.constant.EpidemicConstant.JOB_TYPE_VACCINE; + +// 烟台核酸检测数据同步处理器 +@Component +@Slf4j +public class YanTaiVaccineSyncProcessor { + + public static final int MAX_EXECUTING_COUNT = 3; + + @Autowired + private ExecutorService executorService; + + @Autowired + private IcSyncJobDao icSyncJobDao; + + @Autowired + private DataSyncConfigService dataSyncConfigService; + + @Autowired + private DistributedLock distributedLock; + + @Autowired + RedisUtils redisUtils; + + /** + * @Description 定时扫描和执行同步任务【疫苗接种】 + * @Author zxc + * @Date 2022/11/11 10:32 + */ + @Scheduled(cron = "0/10 * * * * ? ") + public void scanJobs() { + LambdaQueryWrapper executingListQuery = new LambdaQueryWrapper<>(); + executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); + List executingJobList = icSyncJobDao.selectList(executingListQuery); + if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) { + // 最多只允许同时3条线程运行 + return; + } + int executingCount = executingJobList.size(); + // 还可以运行几条线程 + int leftCount = MAX_EXECUTING_COUNT - executingCount; + RLock lock = null; + try { + lock = distributedLock.getLock("data:sync:" + JOB_TYPE_VACCINE, 60L, 60L, TimeUnit.SECONDS); + // 查询可执行的任务列表,并且异步执行 + List icSyncJobToExec = icSyncJobDao.selectExecutableJobList( + EpidemicConstant.JOB_TYPE_VACCINE, + leftCount); + if (!CollectionUtils.isEmpty(icSyncJobToExec)) { + // 异步提交任务 + for (IcSyncJobEntity jobEntity : icSyncJobToExec) { + updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING); + executorService.submit(() -> { + // 将此任务状态修改为执行中 + try { + dataSyncConfigService.execSyncByJobProcessor(jobEntity); + } finally { + // 更新任务状态为结束 + updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH); + } + }); + } + } + } catch (Exception e) { + log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + /** + * 更新任务状态 + * @author wxz + * @date 2022/11/8 下午8:25 + * @param id + * @param status + + */ + private void updateJobStatus(String id, String status) { + LambdaQueryWrapper query = new LambdaQueryWrapper<>(); + query.eq(IcSyncJobEntity::getId, id); + + IcSyncJobEntity updateEntity = new IcSyncJobEntity(); + updateEntity.setOperationStatus(status); + icSyncJobDao.update(updateEntity, query); + } + +} diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java index d7e44b58de..9d17dad7f9 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java @@ -121,8 +121,9 @@ public interface DataSyncConfigService extends BaseService * @param resiInfos * @param customerId * @param isSync + * @param jobType */ - void yantaiHsjcByDbView(List resiInfos, String customerId, String isSync); + void yanTaiDbViewByType(List resiInfos, String customerId, String isSync, String jobType); /** * 更新居民核酸检测信息(通过任务处理器) diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java index 4010a1c231..513561b057 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java @@ -378,7 +378,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl resiInfos, String customerId, String isSync) { - List> resiInfobatchs = ListUtils.partition(resiInfos, 50); - for (List resibatch : resiInfobatchs) { + public void yanTaiDbViewByType(List resiInfos, String customerId, String isSync, String jobType) { + List> resiInfoBatchs = ListUtils.partition(resiInfos, 50); + for (List resiBatch : resiInfoBatchs) { // n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。 try { - yantaiHsjcByDbViewPartition(resibatch, customerId, isSync); + switch (jobType){ + // 核酸检测 + case JOB_TYPE_NAT: + yantaiHsjcByDbViewPartition(resiBatch, customerId, isSync); + // 疫苗接种 + case JOB_TYPE_VACCINE: + yanTaiVaccineByDbViewPartition(resiBatch, customerId, isSync); + default: + break; + } } catch (Exception e) { String errorMsg = ExceptionUtils.getErrorStackTrace(e); - log.error("【更新核酸检测信息(from 兰图)】失败,信息:{}", errorMsg); + log.error("【更新{}信息(from 兰图)】失败,信息:{}", jobType, errorMsg); } } } + /** + * @Description 疫苗接种信息处理 + * @param resiInfos + * @param customerId + * @param isSync + * @Author zxc + * @Date 2022/11/11 11:07 + */ + public void yanTaiVaccineByDbViewPartition(List resiInfos, String customerId, String isSync){ + // 将居民信息转化为的map + Map idCardAndResiInfoMap = resiInfos.stream().collect(Collectors.toMap(resi -> resi.getIdCard(), Function.identity())); + List idCards = new ArrayList<>(idCardAndResiInfoMap.keySet()); + // 1.获取核酸采样信息 + Map args = new HashMap<>(); + args.put("idcards", idCards); + // todo 疫苗接种视图 + + List> vaccineList = yantaiNamedParamLantuJdbcTemplate.queryForList( + "select id, name,card_no, create_time, realname from hscyxxb where card_no in (:idcards)", args); + if (CollectionUtils.isNotEmpty(vaccineList)) { + List entities = new ArrayList<>(); + vaccineList.forEach(resiVaccineInfo -> { + // 从视图中获取到的疫苗接种相关信息 + + }); + } + } + /** * n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。 * @param resiInfos @@ -1198,10 +1235,10 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl