From cf75416107e7784a7bec5cb750b6edd625490314 Mon Sep 17 00:00:00 2001 From: wangxianzhang Date: Tue, 8 Nov 2022 20:40:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=B8=E9=85=B8=E6=A3=80=E6=B5=8B=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=BC=82=E6=AD=A5=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tools/exception/EpmetErrorCode.java | 2 +- .../main/java/com/epmet/UserApplication.java | 2 + .../main/java/com/epmet/dao/IcSyncJobDao.java | 8 +- .../processor/YanTaiNatSyncProcessor.java | 126 ++++++++++++++++++ .../epmet/service/DataSyncConfigService.java | 23 ++++ .../impl/DataSyncConfigServiceImpl.java | 114 ++++++++++------ .../main/resources/mapper/IcSyncJobDao.xml | 23 ++++ 7 files changed, 253 insertions(+), 45 deletions(-) create mode 100644 epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java index 8bea28a618..46e02f1d12 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java @@ -217,7 +217,7 @@ public enum EpmetErrorCode { CUSTOMER_CATEGORY(9101,"分类已使用,不允许删除"), CATEGORY_NAME(9102,"分类名称已存在,不允许重复"), - EXIST_SYNC_JOB_ERROR(9103,"存在同步任务"), + EXIST_SYNC_JOB_ERROR(9103,"存在等待或进行中的任务"), OTHER_SYNC_JOB_ERROR(9104,"同步任务已提交,请稍后查看数据"), // open api异常 diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java index 5767fb4d33..dd147516e9 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java @@ -14,6 +14,7 @@ import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; /** * 管理后台 @@ -22,6 +23,7 @@ import org.springframework.scheduling.annotation.EnableAsync; * @since 1.0.0 */ +@EnableScheduling @SpringBootApplication @EnableDiscoveryClient @EnableFeignClients diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java index 5b4ad6684e..298b5fb6b1 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java @@ -3,6 +3,9 @@ package com.epmet.dao; import com.epmet.commons.mybatis.dao.BaseDao; import com.epmet.entity.IcSyncJobEntity; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; /** * 同步任务表 @@ -12,5 +15,8 @@ import org.apache.ibatis.annotations.Mapper; */ @Mapper public interface IcSyncJobDao extends BaseDao { - + + List selectExecutableJobList(@Param("jobType") String jobType, + @Param("operationStatus") String operationStatus, + @Param("itemCount") int itemCount); } \ No newline at end of file diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java new file mode 100644 index 0000000000..fb8b4a2230 --- /dev/null +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java @@ -0,0 +1,126 @@ +package com.epmet.processor; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.constant.EpidemicConstant; +import com.epmet.dao.IcSyncJobDao; +import com.epmet.entity.IcSyncJobEntity; +import com.epmet.service.DataSyncConfigService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT; + +// 烟台核酸检测数据同步处理器 +@Component +@Slf4j +public class YanTaiNatSyncProcessor { + + public static final int MAX_EXECUTING_COUNT = 3; + + @Autowired + private ExecutorService executorService; + + @Autowired + private IcSyncJobDao icSyncJobDao; + + @Autowired + private DataSyncConfigService dataSyncConfigService; + + @Autowired + private DistributedLock distributedLock; + + @Autowired + RedisUtils redisUtils; + + /** + * 定时扫描和执行同步任务 + * 10s扫一次库 + * + * @author wxz + * @date 2022/11/8 下午5:42 + */ + @Scheduled(cron = "0/10 * * * * ? ") + public void scanJobs() { + log.info("【异步数据更新】开始同步任务"); + + String dataSyncEnable = redisUtils.getString("data:sync:enable"); + if (StringUtils.isEmpty(dataSyncEnable)) { + return; + } + + LambdaQueryWrapper executingListQuery = new LambdaQueryWrapper<>(); + executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); + List executingJobList = icSyncJobDao.selectList(executingListQuery); + + if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) { + // 最多只允许同时3条线程运行 + return; + } + + int executingCount = executingJobList.size(); + // 还可以运行几条线程 + int leftCount = MAX_EXECUTING_COUNT - executingCount; + + RLock lock = null; + try { + lock = distributedLock.getLock("data:sync:" + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS); + // 查询可执行的任务列表,并且异步执行 + List icSyncJobToExec = icSyncJobDao.selectExecutableJobList( + EpidemicConstant.JOB_TYPE_NAT, + EpidemicConstant.OPERATION_STATUS_WAITING, + leftCount); + + if (!CollectionUtils.isEmpty(icSyncJobToExec)) { + // 异步提交任务 + for (IcSyncJobEntity jobEntity : icSyncJobToExec) { + + updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING); + executorService.submit(() -> { + // 将此任务状态修改为执行中 + + dataSyncConfigService.execSyncByJobProcessor(jobEntity); + + // 更新任务状态为结束 + updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH); + }); + } + } + } catch (Exception e) { + log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + /** + * 更新任务状态 + * @author wxz + * @date 2022/11/8 下午8:25 + * @param id + * @param status + + */ + private void updateJobStatus(String id, String status) { + LambdaQueryWrapper query = new LambdaQueryWrapper<>(); + query.eq(IcSyncJobEntity::getId, id); + + IcSyncJobEntity updateEntity = new IcSyncJobEntity(); + updateEntity.setOperationStatus(status); + icSyncJobDao.update(updateEntity, query); + } + +} diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java index dd1efb81b1..d7e44b58de 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java @@ -8,7 +8,11 @@ import com.epmet.dto.DataSyncConfigDTO; import com.epmet.dto.form.ConfigSwitchFormDTO; import com.epmet.dto.form.DataSyncTaskParam; import com.epmet.dto.form.ScopeSaveFormDTO; +import com.epmet.dto.result.NatUserInfoResultDTO; import com.epmet.entity.DataSyncConfigEntity; +import com.epmet.entity.IcSyncJobEntity; + +import java.util.List; /** * 数据更新配置表 @@ -110,4 +114,23 @@ public interface DataSyncConfigService extends BaseService void natInfoSyncButton(DataSyncTaskParam formDTO); + List getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize); + + /** + * 烟台核酸检测(视图方式获取数据) + * @param resiInfos + * @param customerId + * @param isSync + */ + void yantaiHsjcByDbView(List resiInfos, String customerId, String isSync); + + /** + * 更新居民核酸检测信息(通过任务处理器) + * @author wxz + * @date 2022/11/8 下午8:17 + * @param jobEntity + + */ + void execSyncByJobProcessor(IcSyncJobEntity jobEntity); + } diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java index 88e30607ba..3013c0bd79 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java @@ -399,6 +399,13 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl qw = new LambdaQueryWrapper<>(); - qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId()).in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING); + qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId()) + .in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING); List icSyncJobEntities = icSyncJobDao.selectList(qw); // 当前组织下存在同步任务 if (CollectionUtils.isNotEmpty(icSyncJobEntities)){ throw new EpmetException(EpmetErrorCode.EXIST_SYNC_JOB_ERROR.getCode()); } + // 不存在新增一条记录 IcSyncJobEntity e = new IcSyncJobEntity(); e.setCustomerId(formDTO.getCustomerId()); @@ -426,47 +437,39 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl qw2 = new LambdaQueryWrapper<>(); - qw2.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId()) - .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_PROCESSING) - .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT); - List processingList = icSyncJobDao.selectList(qw2); - if (CollectionUtils.isNotEmpty(processingList) && processingList.size() >= NumConstant.TWO){ - throw new EpmetException(EpmetErrorCode.OTHER_SYNC_JOB_ERROR.getCode()); - } - List waitList; - do { - LambdaQueryWrapper qw3 = new LambdaQueryWrapper<>(); - qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId()) - .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING) - .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT) - .orderByAsc(BaseEpmetEntity::getCreatedTime); - waitList = icSyncJobDao.selectList(qw3); - if (CollectionUtils.isNotEmpty(waitList)){ - for (IcSyncJobEntity entity : waitList) { - RLock lock = null; - try { - lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS); - updateSync(entity.getId(),OPERATION_STATUS_PROCESSING); - }catch (Exception ex){ - log.error(ex.getMessage()); - throw new EpmetException(ex.getMessage()); - }finally { - lock.unlock(); - } - formDTO.setAgencyId(entity.getOrgId()); - try { - natInfoScanTask(formDTO); - }catch (Exception ee){ - log.error(ee.getMessage()); - throw new EpmetException(ee.getMessage()); - }finally { - updateSync(entity.getId(),OPERATION_STATUS_FINISH); - } - } - } - }while (CollectionUtils.isNotEmpty(waitList)); + + //List waitList; + //do { + // LambdaQueryWrapper qw3 = new LambdaQueryWrapper<>(); + // qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId()) + // .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING) + // .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT) + // .orderByAsc(BaseEpmetEntity::getCreatedTime); + // waitList = icSyncJobDao.selectList(qw3); + // if (CollectionUtils.isNotEmpty(waitList)){ + // for (IcSyncJobEntity entity : waitList) { + // RLock lock = null; + // try { + // lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS); + // updateSync(entity.getId(),OPERATION_STATUS_PROCESSING); + // }catch (Exception ex){ + // log.error(ex.getMessage()); + // throw new EpmetException(ex.getMessage()); + // }finally { + // lock.unlock(); + // } + // formDTO.setAgencyId(entity.getOrgId()); + // try { + // natInfoScanTask(formDTO); + // }catch (Exception ee){ + // log.error(ee.getMessage()); + // throw new EpmetException(ee.getMessage()); + // }finally { + // updateSync(entity.getId(),OPERATION_STATUS_FINISH); + // } + // } + // } + //}while (CollectionUtils.isNotEmpty(waitList)); } @Transactional(rollbackFor = Exception.class) @@ -695,7 +698,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize) { + public List getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize) { //根据 组织 分页获取 居民数据 PageInfo pageInfo = PageHelper.startPage(pageNo, pageSize, false) .doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO)); @@ -1171,4 +1174,29 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl resis = null; + int pageNo = 1; + int pageSize = 1000; + do { + // 分页,一次查询1000居民,循环更新他们的核酸检测信息 + resis = getNatUserInfoFromDb(p, pageNo, pageSize); + yantaiHsjcByDbView(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR); + pageNo++; + } while (org.springframework.util.CollectionUtils.isEmpty(resis)); + } } diff --git a/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml b/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml index 0b7ecc334d..9f712c8896 100644 --- a/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml +++ b/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml @@ -3,4 +3,27 @@ + + \ No newline at end of file