From 05d1ecf58df99f0a8666cbfadbca2ea2fcb89e12 Mon Sep 17 00:00:00 2001 From: zxc <1272811460@qq.com> Date: Fri, 18 Nov 2022 10:46:01 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=A0=B9=E7=BB=84=E7=BB=87pid=3D0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../epmet/controller/yantai/DataSyncUserAndOrgServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/epmet-module/epmet-third/epmet-third-server/src/main/java/com/epmet/controller/yantai/DataSyncUserAndOrgServiceImpl.java b/epmet-module/epmet-third/epmet-third-server/src/main/java/com/epmet/controller/yantai/DataSyncUserAndOrgServiceImpl.java index 287ff51e80..e49322cfe6 100644 --- a/epmet-module/epmet-third/epmet-third-server/src/main/java/com/epmet/controller/yantai/DataSyncUserAndOrgServiceImpl.java +++ b/epmet-module/epmet-third/epmet-third-server/src/main/java/com/epmet/controller/yantai/DataSyncUserAndOrgServiceImpl.java @@ -120,7 +120,7 @@ public class DataSyncUserAndOrgServiceImpl implements DataSyncUserAndOrgService log.error("未查询到根组织:"+organizationId); return true; } - rootOrg.setPid(""); + rootOrg.setPid(NumConstant.ZERO_STR); rootOrg.setPids(""); rootOrg.setNameOfOrganization(""); needInsert.add(rootOrg); From 43cace181653ebc9701fec868677dbbe3221e438 Mon Sep 17 00:00:00 2001 From: wangxianzhang Date: Sun, 20 Nov 2022 22:56:54 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=8A=BD=E8=B1=A1=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=A8=A1=E5=9D=97=EF=BC=8C=E5=B0=81=E8=A3=85?= =?UTF-8?q?=E9=83=A8=E5=88=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/epmet/dao/IcSyncJobDao.java | 6 +- .../com/epmet/entity/IcSyncJobEntity.java | 9 + .../AbstractDataSyncJobProcessor.java | 213 ++++++++++++++++++ .../processor/YanTaiNatSyncProcessor.java | 119 +++------- .../YanTaiResiComparisonSyncProcessor.java | 101 ++------- .../processor/YanTaiVaccineSyncProcessor.java | 107 +++------ .../main/resources/mapper/IcSyncJobDao.xml | 17 +- 7 files changed, 312 insertions(+), 260 deletions(-) create mode 100644 epmet-user/epmet-user-server/src/main/java/com/epmet/processor/AbstractDataSyncJobProcessor.java diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java index a8dfd6bee2..403ea9d1a8 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java @@ -16,6 +16,8 @@ import java.util.List; @Mapper public interface IcSyncJobDao extends BaseDao { - List selectExecutableJobList(@Param("jobType") String jobType, - @Param("itemCount") int itemCount); + List selectJobListByStatus(@Param("status") String status, + @Param("jobType") String jobType, + @Param("processorIp") String processorIp, + @Param("itemCount") Integer itemCount); } \ No newline at end of file diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java index 0a83b7aa56..e162ceeb4b 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java @@ -56,4 +56,13 @@ public class IcSyncJobEntity extends BaseEpmetEntity { */ private String operationStatus; + /** + * 处理实例的ip + * @author wxz + * @date 2022/11/19 下午11:02 + * * @param null + * + */ + private String processorIp; + } diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/AbstractDataSyncJobProcessor.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/AbstractDataSyncJobProcessor.java new file mode 100644 index 0000000000..950c397c2d --- /dev/null +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/AbstractDataSyncJobProcessor.java @@ -0,0 +1,213 @@ +package com.epmet.processor; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.constant.EpidemicConstant; +import com.epmet.dao.IcSyncJobDao; +import com.epmet.entity.IcSyncJobEntity; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RLock; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.CollectionUtils; + +import javax.annotation.PostConstruct; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @ClassName BaseSyncProcessor + * @Description 抽象数据同步处理器基类。所有基于该模式的同步器都应当继承该类。自定义Scheduler之后,调用基类的scanAndExeJobs()方法。 + * 实现需要实现的方法(定义分布式锁的key,定义同步数据的类型)。有特殊逻辑的可以重写相关方法 + * @Author wangxianzhang + * @Date 2022/11/19 下午11:16 + */ +@Slf4j +public abstract class AbstractDataSyncJobProcessor { + + @Autowired + private ExecutorService executorService; + + @Autowired + private IcSyncJobDao icSyncJobDao; + + @Autowired + private DistributedLock distributedLock; + + /** + * 最多能运行几条线程。如果有特殊情况,可以通过抽象方法每个子类单独定义 + */ + private int MAX_EXECUTING_NUMBER = 3; + + /** + * 本机ip + */ + private String localIp; + + { + try { + // 获取本机ip + InetAddress localHost = Inet4Address.getLocalHost(); + localIp = localHost.getHostAddress(); + log.info("【抽象定时数据同步器】获取本机ip为:{}", localIp); + } catch (UnknownHostException e) { + log.error("【抽象定时数据同步器】获取本机ip失败"); + } + + } + + /** + * 扫描并执行 + * @author wxz + * @date 2022/11/20 上午12:10 + * + * + */ + void scanAndExecWaitingJobs() { + + int executingTaskNumber = getExecutingTaskNumber(); + if (executingTaskNumber >= MAX_EXECUTING_NUMBER) { + // 单个实例下,最多只允许同时3条线程运行 + return; + } + + // 还可以运行几条线程 + int leftCount = MAX_EXECUTING_NUMBER - executingTaskNumber; + RLock lock = null; + try { + lock = distributedLock.getLock(getDistributeLockKey(), getDistributeLockLeaseTime(), getDistributeLockWaitTime(), TimeUnit.SECONDS); + // 查询可执行的任务列表,并且异步执行 + List icSyncJobToExec = icSyncJobDao.selectJobListByStatus("waiting", getJobType(), null, leftCount); + if (!CollectionUtils.isEmpty(icSyncJobToExec)) { + // 异步提交任务 + submitAsyncJob(icSyncJobToExec); + } + } catch (Exception e) { + log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + /** + * 扫描并且执行被中断的任务 + * @author wxz + * @date 2022/11/20 下午10:29 + * + * + */ + @PostConstruct + void scanAndContinueInteruptedJobs() { + // 此处不需要加锁,因为已经按照ip查询了,查到的一定是本机之前执行过的,其他机器查不到该条 + List interuptedJobs = icSyncJobDao.selectJobListByStatus("processing", getJobType(), localIp,null); + if (!CollectionUtils.isEmpty(interuptedJobs)) { + submitAsyncJob(interuptedJobs); + } + } + + /** + * 提交异步任务 + * @author wxz + * @date 2022/11/20 下午10:37 + * * @param icSyncJobToExec + * + */ + void submitAsyncJob(List icSyncJobToExec) { + for (IcSyncJobEntity jobEntity : icSyncJobToExec) { + // 将此任务状态修改为执行中 + updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING, localIp); + executorService.submit(() -> { + try { + execJobTask(jobEntity); + } finally { + // 更新任务状态为结束 + updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH, null); + } + }); + } + } + + /** + * 先检查一下,有几个任务需要执行 + * @author wxz + * @date 2022/11/19 下午10:57 + * @return int + */ + private int getExecutingTaskNumber() { + // 查询本机正在处理中的任务列表 + LambdaQueryWrapper executingListQuery = new LambdaQueryWrapper<>(); + executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); + executingListQuery.eq(IcSyncJobEntity::getProcessorIp, localIp); + return icSyncJobDao.selectCount(executingListQuery); + } + + /** + * 更新任务状态 + * @author wxz + * @date 2022/11/8 下午8:25 + * @param id + * @param status + + */ + private void updateJobStatus(String id, String status, String processorIp) { + LambdaQueryWrapper query = new LambdaQueryWrapper<>(); + query.eq(IcSyncJobEntity::getId, id); + + IcSyncJobEntity updateEntity = new IcSyncJobEntity(); + updateEntity.setOperationStatus(status); + updateEntity.setProcessorIp(processorIp); + icSyncJobDao.update(updateEntity, query); + } + + /** + * 执行任务 + * @author wxz + * @date 2022/11/20 上午12:06 + * + * + */ + protected abstract void execJobTask(IcSyncJobEntity jobEntity); + + /** + * 获取分布式锁的key + * @author wxz + * @date 2022/11/20 上午12:21 + * + * + */ + protected abstract String getDistributeLockKey(); + + /** + * 任务类型 + * @author wxz + * @date 2022/11/20 上午12:25 + * + * * @return String + */ + protected abstract String getJobType(); + + /** + * 分布式锁的续期时间 + * @author wxz + * @date 2022/11/20 下午10:52 + * + * * @return long + */ + protected abstract long getDistributeLockLeaseTime(); + + /** + * 分布式锁的等待时间 + * @author wxz + * @date 2022/11/20 下午10:52 + * + * * @return long + */ + protected abstract long getDistributeLockWaitTime(); + +} \ No newline at end of file diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java index 1fbfb6a69f..97c0dec750 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java @@ -1,49 +1,27 @@ package com.epmet.processor; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.epmet.commons.tools.distributedlock.DistributedLock; -import com.epmet.commons.tools.exception.ExceptionUtils; -import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.constant.EpidemicConstant; -import com.epmet.dao.IcSyncJobDao; import com.epmet.entity.IcSyncJobEntity; import com.epmet.service.DataSyncConfigService; import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT; -// 烟台核酸检测数据同步处理器 +/** + * @Description 烟台核酸检测数据同步处理器 + * @Author wxz + * @Date 2022/11/20 上午11:43 + */ @Component @Slf4j -public class YanTaiNatSyncProcessor { - - public static final int MAX_EXECUTING_COUNT = 3; - - @Autowired - private ExecutorService executorService; - - @Autowired - private IcSyncJobDao icSyncJobDao; +public class YanTaiNatSyncProcessor extends AbstractDataSyncJobProcessor { @Autowired private DataSyncConfigService dataSyncConfigService; - @Autowired - private DistributedLock distributedLock; - - @Autowired - RedisUtils redisUtils; - /** * 定时扫描和执行同步任务 * 10s扫一次库 @@ -53,76 +31,31 @@ public class YanTaiNatSyncProcessor { */ @Scheduled(cron = "0/10 * * * * ? ") public void scanJobs() { - //log.info("【异步数据更新】开始同步任务"); - - //String dataSyncEnable = redisUtils.getString("data:sync:enable"); - //if (StringUtils.isEmpty(dataSyncEnable)) { - // return; - //} - - LambdaQueryWrapper executingListQuery = new LambdaQueryWrapper<>(); - executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); - List executingJobList = icSyncJobDao.selectList(executingListQuery); - - if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) { - // 最多只允许同时3条线程运行 - return; - } - - int executingCount = executingJobList.size(); - // 还可以运行几条线程 - int leftCount = MAX_EXECUTING_COUNT - executingCount; - - RLock lock = null; - try { - lock = distributedLock.getLock("data:sync:" + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS); - // 查询可执行的任务列表,并且异步执行 - List icSyncJobToExec = icSyncJobDao.selectExecutableJobList( - EpidemicConstant.JOB_TYPE_NAT, - leftCount); - - if (!CollectionUtils.isEmpty(icSyncJobToExec)) { - // 异步提交任务 - for (IcSyncJobEntity jobEntity : icSyncJobToExec) { - - updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING); - executorService.submit(() -> { - // 将此任务状态修改为执行中 - - try { - dataSyncConfigService.execSyncByJobProcessor(jobEntity); - } finally { - // 更新任务状态为结束 - updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH); - } + scanAndExecWaitingJobs(); + } - }); - } - } - } catch (Exception e) { - log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); - } finally { - if (lock != null) { - lock.unlock(); - } - } + @Override + protected void execJobTask(IcSyncJobEntity jobEntity) { + dataSyncConfigService.execSyncByJobProcessor(jobEntity); } - /** - * 更新任务状态 - * @author wxz - * @date 2022/11/8 下午8:25 - * @param id - * @param status + @Override + protected String getDistributeLockKey() { + return "data:sync:" + JOB_TYPE_NAT; + } - */ - private void updateJobStatus(String id, String status) { - LambdaQueryWrapper query = new LambdaQueryWrapper<>(); - query.eq(IcSyncJobEntity::getId, id); + @Override + protected String getJobType() { + return EpidemicConstant.JOB_TYPE_NAT; + } - IcSyncJobEntity updateEntity = new IcSyncJobEntity(); - updateEntity.setOperationStatus(status); - icSyncJobDao.update(updateEntity, query); + @Override + protected long getDistributeLockLeaseTime() { + return 60; } + @Override + protected long getDistributeLockWaitTime() { + return 60; + } } diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiResiComparisonSyncProcessor.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiResiComparisonSyncProcessor.java index 8e57474606..cbf0810580 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiResiComparisonSyncProcessor.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiResiComparisonSyncProcessor.java @@ -1,23 +1,13 @@ package com.epmet.processor; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.epmet.commons.tools.distributedlock.DistributedLock; -import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.constant.EpidemicConstant; -import com.epmet.dao.IcSyncJobDao; import com.epmet.entity.IcSyncJobEntity; import com.epmet.service.IcResiComparisonRecordService; import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * @Description 居民一致性对比任务处理器 @@ -26,22 +16,11 @@ import java.util.concurrent.TimeUnit; */ @Component @Slf4j -public class YanTaiResiComparisonSyncProcessor { - - public static final int MAX_EXECUTING_COUNT = 3; - - @Autowired - private ExecutorService executorService; - - @Autowired - private IcSyncJobDao icSyncJobDao; +public class YanTaiResiComparisonSyncProcessor extends AbstractDataSyncJobProcessor { @Autowired private IcResiComparisonRecordService icResiComparisonRecordService; - @Autowired - private DistributedLock distributedLock; - @Autowired RedisUtils redisUtils; @@ -51,69 +30,31 @@ public class YanTaiResiComparisonSyncProcessor { */ @Scheduled(cron = "0/10 * * * * ? ") public void scanJobs() { - //log.info("【异步数据更新】开始同步任务"); - - LambdaQueryWrapper executingListQuery = new LambdaQueryWrapper<>(); - executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); - List executingJobList = icSyncJobDao.selectList(executingListQuery); - - if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) { - // 最多只允许同时3条线程运行 - return; - } - - int executingCount = executingJobList.size(); - // 还可以运行几条线程 - int leftCount = MAX_EXECUTING_COUNT - executingCount; - - RLock lock = null; - try { - lock = distributedLock.getLock("data:sync:comparison:resi", 60L, 60L, TimeUnit.SECONDS); - // 查询可执行的任务列表,并且异步执行 - List icSyncJobToExec = icSyncJobDao.selectExecutableJobList( - EpidemicConstant.JOB_TYPE_COMPARISON_RESI, - leftCount); - - if (!CollectionUtils.isEmpty(icSyncJobToExec)) { - // 异步提交任务 - for (IcSyncJobEntity jobEntity : icSyncJobToExec) { - - updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING); - executorService.submit(() -> { - // 将此任务状态修改为执行中 - - try { - icResiComparisonRecordService.comparisonUserData(jobEntity); - } finally { - // 更新任务状态为结束 - updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH); - } + scanAndExecWaitingJobs(); + } - }); - } - } - } catch (Exception e) { - log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); - } finally { - if (lock != null) { - lock.unlock(); - } - } + @Override + protected void execJobTask(IcSyncJobEntity jobEntity) { + icResiComparisonRecordService.comparisonUserData(jobEntity); } - /** - * 更新任务状态 - * @param id - * @param status + @Override + protected String getDistributeLockKey() { + return "data:sync:comparison:resi"; + } - */ - private void updateJobStatus(String id, String status) { - LambdaQueryWrapper query = new LambdaQueryWrapper<>(); - query.eq(IcSyncJobEntity::getId, id); + @Override + protected String getJobType() { + return EpidemicConstant.JOB_TYPE_COMPARISON_RESI; + } - IcSyncJobEntity updateEntity = new IcSyncJobEntity(); - updateEntity.setOperationStatus(status); - icSyncJobDao.update(updateEntity, query); + @Override + protected long getDistributeLockLeaseTime() { + return 60; } + @Override + protected long getDistributeLockWaitTime() { + return 60; + } } diff --git a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java index e53a279a46..a8427d8c8e 100644 --- a/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java +++ b/epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java @@ -1,49 +1,27 @@ package com.epmet.processor; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.epmet.commons.tools.distributedlock.DistributedLock; -import com.epmet.commons.tools.exception.ExceptionUtils; -import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.constant.EpidemicConstant; -import com.epmet.dao.IcSyncJobDao; import com.epmet.entity.IcSyncJobEntity; import com.epmet.service.DataSyncConfigService; import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT; import static com.epmet.constant.EpidemicConstant.JOB_TYPE_VACCINE; -// 烟台核酸检测数据同步处理器 +/** + * @Description 烟台核酸检测数据同步处理器 + * @Author wxz + * @Date 2022/11/20 上午11:55 + */ @Component @Slf4j -public class YanTaiVaccineSyncProcessor { - - public static final int MAX_EXECUTING_COUNT = 3; - - @Autowired - private ExecutorService executorService; - - @Autowired - private IcSyncJobDao icSyncJobDao; +public class YanTaiVaccineSyncProcessor extends AbstractDataSyncJobProcessor { @Autowired private DataSyncConfigService dataSyncConfigService; - @Autowired - private DistributedLock distributedLock; - - @Autowired - RedisUtils redisUtils; - /** * @Description 定时扫描和执行同步任务【疫苗接种】 * @Author zxc @@ -51,62 +29,31 @@ public class YanTaiVaccineSyncProcessor { */ @Scheduled(cron = "0/10 * * * * ? ") public void scanJobs() { - LambdaQueryWrapper executingListQuery = new LambdaQueryWrapper<>(); - executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); - List executingJobList = icSyncJobDao.selectList(executingListQuery); - if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) { - // 最多只允许同时3条线程运行 - return; - } - int executingCount = executingJobList.size(); - // 还可以运行几条线程 - int leftCount = MAX_EXECUTING_COUNT - executingCount; - RLock lock = null; - try { - lock = distributedLock.getLock("data:sync:" + JOB_TYPE_VACCINE, 60L, 60L, TimeUnit.SECONDS); - // 查询可执行的任务列表,并且异步执行 - List icSyncJobToExec = icSyncJobDao.selectExecutableJobList( - EpidemicConstant.JOB_TYPE_VACCINE, - leftCount); - if (!CollectionUtils.isEmpty(icSyncJobToExec)) { - // 异步提交任务 - for (IcSyncJobEntity jobEntity : icSyncJobToExec) { - updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING); - executorService.submit(() -> { - // 将此任务状态修改为执行中 - try { - dataSyncConfigService.execSyncByJobProcessor(jobEntity); - } finally { - // 更新任务状态为结束 - updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH); - } - }); - } - } - } catch (Exception e) { - log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); - } finally { - if (lock != null) { - lock.unlock(); - } - } + scanAndExecWaitingJobs(); } - /** - * 更新任务状态 - * @author wxz - * @date 2022/11/8 下午8:25 - * @param id - * @param status + @Override + protected void execJobTask(IcSyncJobEntity jobEntity) { + dataSyncConfigService.execSyncByJobProcessor(jobEntity); + } - */ - private void updateJobStatus(String id, String status) { - LambdaQueryWrapper query = new LambdaQueryWrapper<>(); - query.eq(IcSyncJobEntity::getId, id); + @Override + protected String getDistributeLockKey() { + return "data:sync:" + JOB_TYPE_VACCINE; + } + + @Override + protected String getJobType() { + return EpidemicConstant.JOB_TYPE_VACCINE; + } - IcSyncJobEntity updateEntity = new IcSyncJobEntity(); - updateEntity.setOperationStatus(status); - icSyncJobDao.update(updateEntity, query); + @Override + protected long getDistributeLockLeaseTime() { + return 60; } + @Override + protected long getDistributeLockWaitTime() { + return 60; + } } diff --git a/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml b/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml index 6ed0708d43..4b266d56c2 100644 --- a/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml +++ b/epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml @@ -3,8 +3,8 @@ - - select id, customer_id, org_id, @@ -13,6 +13,7 @@ job_type, operator_id, operation_status, + processor_ip, del_flag, revision, created_by, @@ -20,10 +21,16 @@ updated_by, updated_time from ic_sync_job - where OPERATION_STATUS = 'waiting' + where + DEL_FLAG = 0 + and OPERATION_STATUS = #{status} and JOB_TYPE = #{jobType} - and DEL_FLAG = 0 + + and PROCESSOR_IP = #{processorIp} + order by CREATED_TIME asc - limit #{itemCount} + + limit #{itemCount} + \ No newline at end of file From ef4e64c8e3abdc41dd41f55ffffa18d510b9d4f3 Mon Sep 17 00:00:00 2001 From: wangxianzhang Date: Mon, 21 Nov 2022 09:37:49 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E8=A1=A5=E5=85=85sync=5Fjob=E8=A1=A8?= =?UTF-8?q?=E7=9A=84flyway=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../V0.0.89__data_sync_job_create.sql | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 epmet-user/epmet-user-server/src/main/resources/db/migration/V0.0.89__data_sync_job_create.sql diff --git a/epmet-user/epmet-user-server/src/main/resources/db/migration/V0.0.89__data_sync_job_create.sql b/epmet-user/epmet-user-server/src/main/resources/db/migration/V0.0.89__data_sync_job_create.sql new file mode 100644 index 0000000000..e7ff4e1179 --- /dev/null +++ b/epmet-user/epmet-user-server/src/main/resources/db/migration/V0.0.89__data_sync_job_create.sql @@ -0,0 +1,19 @@ +CREATE TABLE `ic_sync_job` ( + `ID` varchar(64) NOT NULL COMMENT 'ID', + `CUSTOMER_ID` varchar(64) NOT NULL COMMENT '客户ID', + `ORG_ID` varchar(64) NOT NULL COMMENT '组织ID', + `PID` varchar(255) NOT NULL COMMENT '组织ID的上级', + `ORG_ID_PATH` varchar(1024) NOT NULL COMMENT '组织ID的所有上级,包括org_id', + `JOB_TYPE` varchar(255) NOT NULL COMMENT '任务类型,残疾:disability;死亡:death;核酸:nat;', + `OPERATOR_ID` varchar(255) NOT NULL COMMENT '操作员ID【staffId】', + `OPERATION_STATUS` varchar(255) DEFAULT NULL COMMENT '操作状态,等待中:waiting;进行中:processing;结束:finish', + `PROCESSOR_IP` varchar(15) DEFAULT NULL COMMENT '处理器实例ip', + `DEL_FLAG` int(11) NOT NULL, + `REVISION` int(11) NOT NULL COMMENT '乐观锁', + `CREATED_BY` varchar(32) NOT NULL COMMENT '创建人', + `CREATED_TIME` datetime NOT NULL COMMENT '创建时间', + `UPDATED_BY` varchar(32) NOT NULL COMMENT '更新人', + `UPDATED_TIME` datetime NOT NULL COMMENT '更新时间', + PRIMARY KEY (`ID`), + KEY `ORG_ID` (`ORG_ID`,`OPERATION_STATUS`,`JOB_TYPE`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='同步任务表' \ No newline at end of file