forked from rongchao/epmet-cloud-rizhao
9 changed files with 332 additions and 261 deletions
@ -0,0 +1,213 @@ |
|||
package com.epmet.processor; |
|||
|
|||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|||
import com.epmet.commons.tools.distributedlock.DistributedLock; |
|||
import com.epmet.commons.tools.exception.ExceptionUtils; |
|||
import com.epmet.constant.EpidemicConstant; |
|||
import com.epmet.dao.IcSyncJobDao; |
|||
import com.epmet.entity.IcSyncJobEntity; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.redisson.api.RLock; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.util.CollectionUtils; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import java.net.Inet4Address; |
|||
import java.net.InetAddress; |
|||
import java.net.UnknownHostException; |
|||
import java.util.List; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
/** |
|||
* @ClassName BaseSyncProcessor |
|||
* @Description 抽象数据同步处理器基类。所有基于该模式的同步器都应当继承该类。自定义Scheduler之后,调用基类的scanAndExeJobs()方法。 |
|||
* 实现需要实现的方法(定义分布式锁的key,定义同步数据的类型)。有特殊逻辑的可以重写相关方法 |
|||
* @Author wangxianzhang |
|||
* @Date 2022/11/19 下午11:16 |
|||
*/ |
|||
@Slf4j |
|||
public abstract class AbstractDataSyncJobProcessor { |
|||
|
|||
@Autowired |
|||
private ExecutorService executorService; |
|||
|
|||
@Autowired |
|||
private IcSyncJobDao icSyncJobDao; |
|||
|
|||
@Autowired |
|||
private DistributedLock distributedLock; |
|||
|
|||
/** |
|||
* 最多能运行几条线程。如果有特殊情况,可以通过抽象方法每个子类单独定义 |
|||
*/ |
|||
private int MAX_EXECUTING_NUMBER = 3; |
|||
|
|||
/** |
|||
* 本机ip |
|||
*/ |
|||
private String localIp; |
|||
|
|||
{ |
|||
try { |
|||
// 获取本机ip
|
|||
InetAddress localHost = Inet4Address.getLocalHost(); |
|||
localIp = localHost.getHostAddress(); |
|||
log.info("【抽象定时数据同步器】获取本机ip为:{}", localIp); |
|||
} catch (UnknownHostException e) { |
|||
log.error("【抽象定时数据同步器】获取本机ip失败"); |
|||
} |
|||
|
|||
} |
|||
|
|||
/** |
|||
* 扫描并执行 |
|||
* @author wxz |
|||
* @date 2022/11/20 上午12:10 |
|||
* |
|||
* |
|||
*/ |
|||
void scanAndExecWaitingJobs() { |
|||
|
|||
int executingTaskNumber = getExecutingTaskNumber(); |
|||
if (executingTaskNumber >= MAX_EXECUTING_NUMBER) { |
|||
// 单个实例下,最多只允许同时3条线程运行
|
|||
return; |
|||
} |
|||
|
|||
// 还可以运行几条线程
|
|||
int leftCount = MAX_EXECUTING_NUMBER - executingTaskNumber; |
|||
RLock lock = null; |
|||
try { |
|||
lock = distributedLock.getLock(getDistributeLockKey(), getDistributeLockLeaseTime(), getDistributeLockWaitTime(), TimeUnit.SECONDS); |
|||
// 查询可执行的任务列表,并且异步执行
|
|||
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectJobListByStatus("waiting", getJobType(), null, leftCount); |
|||
if (!CollectionUtils.isEmpty(icSyncJobToExec)) { |
|||
// 异步提交任务
|
|||
submitAsyncJob(icSyncJobToExec); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); |
|||
} finally { |
|||
if (lock != null) { |
|||
lock.unlock(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 扫描并且执行被中断的任务 |
|||
* @author wxz |
|||
* @date 2022/11/20 下午10:29 |
|||
* |
|||
* |
|||
*/ |
|||
@PostConstruct |
|||
void scanAndContinueInteruptedJobs() { |
|||
// 此处不需要加锁,因为已经按照ip查询了,查到的一定是本机之前执行过的,其他机器查不到该条
|
|||
List<IcSyncJobEntity> interuptedJobs = icSyncJobDao.selectJobListByStatus("processing", getJobType(), localIp,null); |
|||
if (!CollectionUtils.isEmpty(interuptedJobs)) { |
|||
submitAsyncJob(interuptedJobs); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 提交异步任务 |
|||
* @author wxz |
|||
* @date 2022/11/20 下午10:37 |
|||
* * @param icSyncJobToExec |
|||
* |
|||
*/ |
|||
void submitAsyncJob(List<IcSyncJobEntity> icSyncJobToExec) { |
|||
for (IcSyncJobEntity jobEntity : icSyncJobToExec) { |
|||
// 将此任务状态修改为执行中
|
|||
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING, localIp); |
|||
executorService.submit(() -> { |
|||
try { |
|||
execJobTask(jobEntity); |
|||
} finally { |
|||
// 更新任务状态为结束
|
|||
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH, null); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 先检查一下,有几个任务需要执行 |
|||
* @author wxz |
|||
* @date 2022/11/19 下午10:57 |
|||
* @return int |
|||
*/ |
|||
private int getExecutingTaskNumber() { |
|||
// 查询本机正在处理中的任务列表
|
|||
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>(); |
|||
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); |
|||
executingListQuery.eq(IcSyncJobEntity::getProcessorIp, localIp); |
|||
return icSyncJobDao.selectCount(executingListQuery); |
|||
} |
|||
|
|||
/** |
|||
* 更新任务状态 |
|||
* @author wxz |
|||
* @date 2022/11/8 下午8:25 |
|||
* @param id |
|||
* @param status |
|||
|
|||
*/ |
|||
private void updateJobStatus(String id, String status, String processorIp) { |
|||
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>(); |
|||
query.eq(IcSyncJobEntity::getId, id); |
|||
|
|||
IcSyncJobEntity updateEntity = new IcSyncJobEntity(); |
|||
updateEntity.setOperationStatus(status); |
|||
updateEntity.setProcessorIp(processorIp); |
|||
icSyncJobDao.update(updateEntity, query); |
|||
} |
|||
|
|||
/** |
|||
* 执行任务 |
|||
* @author wxz |
|||
* @date 2022/11/20 上午12:06 |
|||
* |
|||
* |
|||
*/ |
|||
protected abstract void execJobTask(IcSyncJobEntity jobEntity); |
|||
|
|||
/** |
|||
* 获取分布式锁的key |
|||
* @author wxz |
|||
* @date 2022/11/20 上午12:21 |
|||
* |
|||
* |
|||
*/ |
|||
protected abstract String getDistributeLockKey(); |
|||
|
|||
/** |
|||
* 任务类型 |
|||
* @author wxz |
|||
* @date 2022/11/20 上午12:25 |
|||
* |
|||
* * @return String |
|||
*/ |
|||
protected abstract String getJobType(); |
|||
|
|||
/** |
|||
* 分布式锁的续期时间 |
|||
* @author wxz |
|||
* @date 2022/11/20 下午10:52 |
|||
* |
|||
* * @return long |
|||
*/ |
|||
protected abstract long getDistributeLockLeaseTime(); |
|||
|
|||
/** |
|||
* 分布式锁的等待时间 |
|||
* @author wxz |
|||
* @date 2022/11/20 下午10:52 |
|||
* |
|||
* * @return long |
|||
*/ |
|||
protected abstract long getDistributeLockWaitTime(); |
|||
|
|||
} |
@ -0,0 +1,19 @@ |
|||
CREATE TABLE `ic_sync_job` ( |
|||
`ID` varchar(64) NOT NULL COMMENT 'ID', |
|||
`CUSTOMER_ID` varchar(64) NOT NULL COMMENT '客户ID', |
|||
`ORG_ID` varchar(64) NOT NULL COMMENT '组织ID', |
|||
`PID` varchar(255) NOT NULL COMMENT '组织ID的上级', |
|||
`ORG_ID_PATH` varchar(1024) NOT NULL COMMENT '组织ID的所有上级,包括org_id', |
|||
`JOB_TYPE` varchar(255) NOT NULL COMMENT '任务类型,残疾:disability;死亡:death;核酸:nat;', |
|||
`OPERATOR_ID` varchar(255) NOT NULL COMMENT '操作员ID【staffId】', |
|||
`OPERATION_STATUS` varchar(255) DEFAULT NULL COMMENT '操作状态,等待中:waiting;进行中:processing;结束:finish', |
|||
`PROCESSOR_IP` varchar(15) DEFAULT NULL COMMENT '处理器实例ip', |
|||
`DEL_FLAG` int(11) NOT NULL, |
|||
`REVISION` int(11) NOT NULL COMMENT '乐观锁', |
|||
`CREATED_BY` varchar(32) NOT NULL COMMENT '创建人', |
|||
`CREATED_TIME` datetime NOT NULL COMMENT '创建时间', |
|||
`UPDATED_BY` varchar(32) NOT NULL COMMENT '更新人', |
|||
`UPDATED_TIME` datetime NOT NULL COMMENT '更新时间', |
|||
PRIMARY KEY (`ID`), |
|||
KEY `ORG_ID` (`ORG_ID`,`OPERATION_STATUS`,`JOB_TYPE`) USING BTREE |
|||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='同步任务表' |
Loading…
Reference in new issue