Browse Source

抽象数据同步模块,封装部分逻辑

dev
wangxianzhang 3 years ago
parent
commit
43cace1816
  1. 6
      epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java
  2. 9
      epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java
  3. 213
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/AbstractDataSyncJobProcessor.java
  4. 113
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java
  5. 95
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiResiComparisonSyncProcessor.java
  6. 103
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java
  7. 15
      epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml

6
epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java

@ -16,6 +16,8 @@ import java.util.List;
@Mapper
public interface IcSyncJobDao extends BaseDao<IcSyncJobEntity> {
List<IcSyncJobEntity> selectExecutableJobList(@Param("jobType") String jobType,
@Param("itemCount") int itemCount);
List<IcSyncJobEntity> selectJobListByStatus(@Param("status") String status,
@Param("jobType") String jobType,
@Param("processorIp") String processorIp,
@Param("itemCount") Integer itemCount);
}

9
epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java

@ -56,4 +56,13 @@ public class IcSyncJobEntity extends BaseEpmetEntity {
*/
private String operationStatus;
/**
* 处理实例的ip
* @author wxz
* @date 2022/11/19 下午11:02
* * @param null
*
*/
private String processorIp;
}

213
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/AbstractDataSyncJobProcessor.java

@ -0,0 +1,213 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @ClassName BaseSyncProcessor
* @Description 抽象数据同步处理器基类所有基于该模式的同步器都应当继承该类自定义Scheduler之后调用基类的scanAndExeJobs()方法
* 实现需要实现的方法(定义分布式锁的key定义同步数据的类型)有特殊逻辑的可以重写相关方法
* @Author wangxianzhang
* @Date 2022/11/19 下午11:16
*/
@Slf4j
public abstract class AbstractDataSyncJobProcessor {
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
@Autowired
private DistributedLock distributedLock;
/**
* 最多能运行几条线程如果有特殊情况可以通过抽象方法每个子类单独定义
*/
private int MAX_EXECUTING_NUMBER = 3;
/**
* 本机ip
*/
private String localIp;
{
try {
// 获取本机ip
InetAddress localHost = Inet4Address.getLocalHost();
localIp = localHost.getHostAddress();
log.info("【抽象定时数据同步器】获取本机ip为:{}", localIp);
} catch (UnknownHostException e) {
log.error("【抽象定时数据同步器】获取本机ip失败");
}
}
/**
* 扫描并执行
* @author wxz
* @date 2022/11/20 上午12:10
*
*
*/
void scanAndExecWaitingJobs() {
int executingTaskNumber = getExecutingTaskNumber();
if (executingTaskNumber >= MAX_EXECUTING_NUMBER) {
// 单个实例下,最多只允许同时3条线程运行
return;
}
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_NUMBER - executingTaskNumber;
RLock lock = null;
try {
lock = distributedLock.getLock(getDistributeLockKey(), getDistributeLockLeaseTime(), getDistributeLockWaitTime(), TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectJobListByStatus("waiting", getJobType(), null, leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
submitAsyncJob(icSyncJobToExec);
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 扫描并且执行被中断的任务
* @author wxz
* @date 2022/11/20 下午10:29
*
*
*/
@PostConstruct
void scanAndContinueInteruptedJobs() {
// 此处不需要加锁,因为已经按照ip查询了,查到的一定是本机之前执行过的,其他机器查不到该条
List<IcSyncJobEntity> interuptedJobs = icSyncJobDao.selectJobListByStatus("processing", getJobType(), localIp,null);
if (!CollectionUtils.isEmpty(interuptedJobs)) {
submitAsyncJob(interuptedJobs);
}
}
/**
* 提交异步任务
* @author wxz
* @date 2022/11/20 下午10:37
* * @param icSyncJobToExec
*
*/
void submitAsyncJob(List<IcSyncJobEntity> icSyncJobToExec) {
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
// 将此任务状态修改为执行中
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING, localIp);
executorService.submit(() -> {
try {
execJobTask(jobEntity);
} finally {
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH, null);
}
});
}
}
/**
* 先检查一下有几个任务需要执行
* @author wxz
* @date 2022/11/19 下午10:57
* @return int
*/
private int getExecutingTaskNumber() {
// 查询本机正在处理中的任务列表
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
executingListQuery.eq(IcSyncJobEntity::getProcessorIp, localIp);
return icSyncJobDao.selectCount(executingListQuery);
}
/**
* 更新任务状态
* @author wxz
* @date 2022/11/8 下午8:25
* @param id
* @param status
*/
private void updateJobStatus(String id, String status, String processorIp) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
updateEntity.setProcessorIp(processorIp);
icSyncJobDao.update(updateEntity, query);
}
/**
* 执行任务
* @author wxz
* @date 2022/11/20 上午12:06
*
*
*/
protected abstract void execJobTask(IcSyncJobEntity jobEntity);
/**
* 获取分布式锁的key
* @author wxz
* @date 2022/11/20 上午12:21
*
*
*/
protected abstract String getDistributeLockKey();
/**
* 任务类型
* @author wxz
* @date 2022/11/20 上午12:25
*
* * @return String
*/
protected abstract String getJobType();
/**
* 分布式锁的续期时间
* @author wxz
* @date 2022/11/20 下午10:52
*
* * @return long
*/
protected abstract long getDistributeLockLeaseTime();
/**
* 分布式锁的等待时间
* @author wxz
* @date 2022/11/20 下午10:52
*
* * @return long
*/
protected abstract long getDistributeLockWaitTime();
}

113
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java

@ -1,49 +1,27 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.DataSyncConfigService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT;
// 烟台核酸检测数据同步处理器
/**
* @Description 烟台核酸检测数据同步处理器
* @Author wxz
* @Date 2022/11/20 上午11:43
*/
@Component
@Slf4j
public class YanTaiNatSyncProcessor {
public static final int MAX_EXECUTING_COUNT = 3;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
public class YanTaiNatSyncProcessor extends AbstractDataSyncJobProcessor {
@Autowired
private DataSyncConfigService dataSyncConfigService;
@Autowired
private DistributedLock distributedLock;
@Autowired
RedisUtils redisUtils;
/**
* 定时扫描和执行同步任务
* 10s扫一次库
@ -53,76 +31,31 @@ public class YanTaiNatSyncProcessor {
*/
@Scheduled(cron = "0/10 * * * * ? ")
public void scanJobs() {
//log.info("【异步数据更新】开始同步任务");
//String dataSyncEnable = redisUtils.getString("data:sync:enable");
//if (StringUtils.isEmpty(dataSyncEnable)) {
// return;
//}
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery);
if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) {
// 最多只允许同时3条线程运行
return;
scanAndExecWaitingJobs();
}
int executingCount = executingJobList.size();
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_COUNT - executingCount;
RLock lock = null;
try {
lock = distributedLock.getLock("data:sync:" + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList(
EpidemicConstant.JOB_TYPE_NAT,
leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING);
executorService.submit(() -> {
// 将此任务状态修改为执行中
try {
@Override
protected void execJobTask(IcSyncJobEntity jobEntity) {
dataSyncConfigService.execSyncByJobProcessor(jobEntity);
} finally {
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH);
}
});
}
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
@Override
protected String getDistributeLockKey() {
return "data:sync:" + JOB_TYPE_NAT;
}
}
/**
* 更新任务状态
* @author wxz
* @date 2022/11/8 下午8:25
* @param id
* @param status
*/
private void updateJobStatus(String id, String status) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
@Override
protected String getJobType() {
return EpidemicConstant.JOB_TYPE_NAT;
}
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
icSyncJobDao.update(updateEntity, query);
@Override
protected long getDistributeLockLeaseTime() {
return 60;
}
@Override
protected long getDistributeLockWaitTime() {
return 60;
}
}

95
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiResiComparisonSyncProcessor.java

@ -1,23 +1,13 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.IcResiComparisonRecordService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Description 居民一致性对比任务处理器
@ -26,22 +16,11 @@ import java.util.concurrent.TimeUnit;
*/
@Component
@Slf4j
public class YanTaiResiComparisonSyncProcessor {
public static final int MAX_EXECUTING_COUNT = 3;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
public class YanTaiResiComparisonSyncProcessor extends AbstractDataSyncJobProcessor {
@Autowired
private IcResiComparisonRecordService icResiComparisonRecordService;
@Autowired
private DistributedLock distributedLock;
@Autowired
RedisUtils redisUtils;
@ -51,69 +30,31 @@ public class YanTaiResiComparisonSyncProcessor {
*/
@Scheduled(cron = "0/10 * * * * ? ")
public void scanJobs() {
//log.info("【异步数据更新】开始同步任务");
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery);
if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) {
// 最多只允许同时3条线程运行
return;
scanAndExecWaitingJobs();
}
int executingCount = executingJobList.size();
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_COUNT - executingCount;
RLock lock = null;
try {
lock = distributedLock.getLock("data:sync:comparison:resi", 60L, 60L, TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList(
EpidemicConstant.JOB_TYPE_COMPARISON_RESI,
leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING);
executorService.submit(() -> {
// 将此任务状态修改为执行中
try {
@Override
protected void execJobTask(IcSyncJobEntity jobEntity) {
icResiComparisonRecordService.comparisonUserData(jobEntity);
} finally {
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH);
}
});
}
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
}
@Override
protected String getDistributeLockKey() {
return "data:sync:comparison:resi";
}
/**
* 更新任务状态
* @param id
* @param status
*/
private void updateJobStatus(String id, String status) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
@Override
protected String getJobType() {
return EpidemicConstant.JOB_TYPE_COMPARISON_RESI;
}
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
icSyncJobDao.update(updateEntity, query);
@Override
protected long getDistributeLockLeaseTime() {
return 60;
}
@Override
protected long getDistributeLockWaitTime() {
return 60;
}
}

103
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java

@ -1,49 +1,27 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.DataSyncConfigService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_VACCINE;
// 烟台核酸检测数据同步处理器
/**
* @Description 烟台核酸检测数据同步处理器
* @Author wxz
* @Date 2022/11/20 上午11:55
*/
@Component
@Slf4j
public class YanTaiVaccineSyncProcessor {
public static final int MAX_EXECUTING_COUNT = 3;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
public class YanTaiVaccineSyncProcessor extends AbstractDataSyncJobProcessor {
@Autowired
private DataSyncConfigService dataSyncConfigService;
@Autowired
private DistributedLock distributedLock;
@Autowired
RedisUtils redisUtils;
/**
* @Description 定时扫描和执行同步任务疫苗接种
* @Author zxc
@ -51,62 +29,31 @@ public class YanTaiVaccineSyncProcessor {
*/
@Scheduled(cron = "0/10 * * * * ? ")
public void scanJobs() {
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery);
if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) {
// 最多只允许同时3条线程运行
return;
scanAndExecWaitingJobs();
}
int executingCount = executingJobList.size();
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_COUNT - executingCount;
RLock lock = null;
try {
lock = distributedLock.getLock("data:sync:" + JOB_TYPE_VACCINE, 60L, 60L, TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList(
EpidemicConstant.JOB_TYPE_VACCINE,
leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING);
executorService.submit(() -> {
// 将此任务状态修改为执行中
try {
@Override
protected void execJobTask(IcSyncJobEntity jobEntity) {
dataSyncConfigService.execSyncByJobProcessor(jobEntity);
} finally {
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH);
}
});
}
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 更新任务状态
* @author wxz
* @date 2022/11/8 下午8:25
* @param id
* @param status
@Override
protected String getDistributeLockKey() {
return "data:sync:" + JOB_TYPE_VACCINE;
}
*/
private void updateJobStatus(String id, String status) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
@Override
protected String getJobType() {
return EpidemicConstant.JOB_TYPE_VACCINE;
}
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
icSyncJobDao.update(updateEntity, query);
@Override
protected long getDistributeLockLeaseTime() {
return 60;
}
@Override
protected long getDistributeLockWaitTime() {
return 60;
}
}

15
epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml

@ -3,8 +3,8 @@
<mapper namespace="com.epmet.dao.IcSyncJobDao">
<!--查询可执行任务列表-->
<select id="selectExecutableJobList" resultType="com.epmet.entity.IcSyncJobEntity">
<!--查询指定状态的任务列表-->
<select id="selectJobListByStatus" resultType="com.epmet.entity.IcSyncJobEntity">
select id,
customer_id,
org_id,
@ -13,6 +13,7 @@
job_type,
operator_id,
operation_status,
processor_ip,
del_flag,
revision,
created_by,
@ -20,10 +21,16 @@
updated_by,
updated_time
from ic_sync_job
where OPERATION_STATUS = 'waiting'
where
DEL_FLAG = 0
and OPERATION_STATUS = #{status}
and JOB_TYPE = #{jobType}
and DEL_FLAG = 0
<if test="processorIp != null">
and PROCESSOR_IP = #{processorIp}
</if>
order by CREATED_TIME asc
<if test="itemCount != null">
limit #{itemCount}
</if>
</select>
</mapper>
Loading…
Cancel
Save