Browse Source

核酸检测同步,增加异步处理任务

dev
wangxianzhang 3 years ago
parent
commit
cf75416107
  1. 2
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java
  2. 2
      epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java
  3. 8
      epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java
  4. 126
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java
  5. 23
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java
  6. 114
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java
  7. 23
      epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml

2
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java

@ -217,7 +217,7 @@ public enum EpmetErrorCode {
CUSTOMER_CATEGORY(9101,"分类已使用,不允许删除"), CUSTOMER_CATEGORY(9101,"分类已使用,不允许删除"),
CATEGORY_NAME(9102,"分类名称已存在,不允许重复"), CATEGORY_NAME(9102,"分类名称已存在,不允许重复"),
EXIST_SYNC_JOB_ERROR(9103,"存在同步任务"), EXIST_SYNC_JOB_ERROR(9103,"存在等待或进行中的任务"),
OTHER_SYNC_JOB_ERROR(9104,"同步任务已提交,请稍后查看数据"), OTHER_SYNC_JOB_ERROR(9104,"同步任务已提交,请稍后查看数据"),
// open api异常 // open api异常

2
epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java

@ -14,6 +14,7 @@ import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
/** /**
* 管理后台 * 管理后台
@ -22,6 +23,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
* @since 1.0.0 * @since 1.0.0
*/ */
@EnableScheduling
@SpringBootApplication @SpringBootApplication
@EnableDiscoveryClient @EnableDiscoveryClient
@EnableFeignClients @EnableFeignClients

8
epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java

@ -3,6 +3,9 @@ package com.epmet.dao;
import com.epmet.commons.mybatis.dao.BaseDao; import com.epmet.commons.mybatis.dao.BaseDao;
import com.epmet.entity.IcSyncJobEntity; import com.epmet.entity.IcSyncJobEntity;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/** /**
* 同步任务表 * 同步任务表
@ -12,5 +15,8 @@ import org.apache.ibatis.annotations.Mapper;
*/ */
@Mapper @Mapper
public interface IcSyncJobDao extends BaseDao<IcSyncJobEntity> { public interface IcSyncJobDao extends BaseDao<IcSyncJobEntity> {
List<IcSyncJobEntity> selectExecutableJobList(@Param("jobType") String jobType,
@Param("operationStatus") String operationStatus,
@Param("itemCount") int itemCount);
} }

126
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java

@ -0,0 +1,126 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.DataSyncConfigService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT;
// 烟台核酸检测数据同步处理器
@Component
@Slf4j
public class YanTaiNatSyncProcessor {
public static final int MAX_EXECUTING_COUNT = 3;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
@Autowired
private DataSyncConfigService dataSyncConfigService;
@Autowired
private DistributedLock distributedLock;
@Autowired
RedisUtils redisUtils;
/**
* 定时扫描和执行同步任务
* 10s扫一次库
*
* @author wxz
* @date 2022/11/8 下午5:42
*/
@Scheduled(cron = "0/10 * * * * ? ")
public void scanJobs() {
log.info("【异步数据更新】开始同步任务");
String dataSyncEnable = redisUtils.getString("data:sync:enable");
if (StringUtils.isEmpty(dataSyncEnable)) {
return;
}
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery);
if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) {
// 最多只允许同时3条线程运行
return;
}
int executingCount = executingJobList.size();
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_COUNT - executingCount;
RLock lock = null;
try {
lock = distributedLock.getLock("data:sync:" + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList(
EpidemicConstant.JOB_TYPE_NAT,
EpidemicConstant.OPERATION_STATUS_WAITING,
leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING);
executorService.submit(() -> {
// 将此任务状态修改为执行中
dataSyncConfigService.execSyncByJobProcessor(jobEntity);
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH);
});
}
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 更新任务状态
* @author wxz
* @date 2022/11/8 下午8:25
* @param id
* @param status
*/
private void updateJobStatus(String id, String status) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
icSyncJobDao.update(updateEntity, query);
}
}

23
epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java

@ -8,7 +8,11 @@ import com.epmet.dto.DataSyncConfigDTO;
import com.epmet.dto.form.ConfigSwitchFormDTO; import com.epmet.dto.form.ConfigSwitchFormDTO;
import com.epmet.dto.form.DataSyncTaskParam; import com.epmet.dto.form.DataSyncTaskParam;
import com.epmet.dto.form.ScopeSaveFormDTO; import com.epmet.dto.form.ScopeSaveFormDTO;
import com.epmet.dto.result.NatUserInfoResultDTO;
import com.epmet.entity.DataSyncConfigEntity; import com.epmet.entity.DataSyncConfigEntity;
import com.epmet.entity.IcSyncJobEntity;
import java.util.List;
/** /**
* 数据更新配置表 * 数据更新配置表
@ -110,4 +114,23 @@ public interface DataSyncConfigService extends BaseService<DataSyncConfigEntity>
void natInfoSyncButton(DataSyncTaskParam formDTO); void natInfoSyncButton(DataSyncTaskParam formDTO);
List<NatUserInfoResultDTO> getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize);
/**
* 烟台核酸检测(视图方式获取数据)
* @param resiInfos
* @param customerId
* @param isSync
*/
void yantaiHsjcByDbView(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync);
/**
* 更新居民核酸检测信息(通过任务处理器)
* @author wxz
* @date 2022/11/8 下午8:17
* @param jobEntity
*/
void execSyncByJobProcessor(IcSyncJobEntity jobEntity);
} }

114
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

@ -399,6 +399,13 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
} while (dbResiList != null && dbResiList.size() == pageSize); } while (dbResiList != null && dbResiList.size() == pageSize);
} }
/**
* 提交同步任务放到任务池
* @author wxz
* @date 2022/11/8 下午5:32
* @param formDTO
*/
@Override @Override
public void natInfoSyncButton(DataSyncTaskParam formDTO) { public void natInfoSyncButton(DataSyncTaskParam formDTO) {
CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis.getStaffInfo(formDTO.getCustomerId(), formDTO.getStaffId()); CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis.getStaffInfo(formDTO.getCustomerId(), formDTO.getStaffId());
@ -409,13 +416,17 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
if (null == agencyInfo){ if (null == agencyInfo){
throw new EpmetException("未查询到组织信息:"+formDTO.getAgencyId()); throw new EpmetException("未查询到组织信息:"+formDTO.getAgencyId());
} }
// 查询该组织是否存在等待中或者进行中的任务
LambdaQueryWrapper<IcSyncJobEntity> qw = new LambdaQueryWrapper<>(); LambdaQueryWrapper<IcSyncJobEntity> qw = new LambdaQueryWrapper<>();
qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId()).in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING); qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId())
.in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> icSyncJobEntities = icSyncJobDao.selectList(qw); List<IcSyncJobEntity> icSyncJobEntities = icSyncJobDao.selectList(qw);
// 当前组织下存在同步任务 // 当前组织下存在同步任务
if (CollectionUtils.isNotEmpty(icSyncJobEntities)){ if (CollectionUtils.isNotEmpty(icSyncJobEntities)){
throw new EpmetException(EpmetErrorCode.EXIST_SYNC_JOB_ERROR.getCode()); throw new EpmetException(EpmetErrorCode.EXIST_SYNC_JOB_ERROR.getCode());
} }
// 不存在新增一条记录 // 不存在新增一条记录
IcSyncJobEntity e = new IcSyncJobEntity(); IcSyncJobEntity e = new IcSyncJobEntity();
e.setCustomerId(formDTO.getCustomerId()); e.setCustomerId(formDTO.getCustomerId());
@ -426,47 +437,39 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
e.setOperatorId(formDTO.getStaffId()); e.setOperatorId(formDTO.getStaffId());
e.setOperationStatus(OPERATION_STATUS_WAITING); e.setOperationStatus(OPERATION_STATUS_WAITING);
insertSync(e); insertSync(e);
// 客户下,type=nat 存在两条正在进行中的同步任务
LambdaQueryWrapper<IcSyncJobEntity> qw2 = new LambdaQueryWrapper<>(); //List<IcSyncJobEntity> waitList;
qw2.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId()) //do {
.eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_PROCESSING) // LambdaQueryWrapper<IcSyncJobEntity> qw3 = new LambdaQueryWrapper<>();
.eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT); // qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId())
List<IcSyncJobEntity> processingList = icSyncJobDao.selectList(qw2); // .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING)
if (CollectionUtils.isNotEmpty(processingList) && processingList.size() >= NumConstant.TWO){ // .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT)
throw new EpmetException(EpmetErrorCode.OTHER_SYNC_JOB_ERROR.getCode()); // .orderByAsc(BaseEpmetEntity::getCreatedTime);
} // waitList = icSyncJobDao.selectList(qw3);
List<IcSyncJobEntity> waitList; // if (CollectionUtils.isNotEmpty(waitList)){
do { // for (IcSyncJobEntity entity : waitList) {
LambdaQueryWrapper<IcSyncJobEntity> qw3 = new LambdaQueryWrapper<>(); // RLock lock = null;
qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId()) // try {
.eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING) // lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
.eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT) // updateSync(entity.getId(),OPERATION_STATUS_PROCESSING);
.orderByAsc(BaseEpmetEntity::getCreatedTime); // }catch (Exception ex){
waitList = icSyncJobDao.selectList(qw3); // log.error(ex.getMessage());
if (CollectionUtils.isNotEmpty(waitList)){ // throw new EpmetException(ex.getMessage());
for (IcSyncJobEntity entity : waitList) { // }finally {
RLock lock = null; // lock.unlock();
try { // }
lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS); // formDTO.setAgencyId(entity.getOrgId());
updateSync(entity.getId(),OPERATION_STATUS_PROCESSING); // try {
}catch (Exception ex){ // natInfoScanTask(formDTO);
log.error(ex.getMessage()); // }catch (Exception ee){
throw new EpmetException(ex.getMessage()); // log.error(ee.getMessage());
}finally { // throw new EpmetException(ee.getMessage());
lock.unlock(); // }finally {
} // updateSync(entity.getId(),OPERATION_STATUS_FINISH);
formDTO.setAgencyId(entity.getOrgId()); // }
try { // }
natInfoScanTask(formDTO); // }
}catch (Exception ee){ //}while (CollectionUtils.isNotEmpty(waitList));
log.error(ee.getMessage());
throw new EpmetException(ee.getMessage());
}finally {
updateSync(entity.getId(),OPERATION_STATUS_FINISH);
}
}
}
}while (CollectionUtils.isNotEmpty(waitList));
} }
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@ -695,7 +698,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* @param pageSize * @param pageSize
* @return * @return
*/ */
private List<NatUserInfoResultDTO> getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize) { public List<NatUserInfoResultDTO> getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize) {
//根据 组织 分页获取 居民数据 //根据 组织 分页获取 居民数据
PageInfo<NatUserInfoResultDTO> pageInfo = PageHelper.startPage(pageNo, pageSize, false) PageInfo<NatUserInfoResultDTO> pageInfo = PageHelper.startPage(pageNo, pageSize, false)
.doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO)); .doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO));
@ -1171,4 +1174,29 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
} }
} }
} }
/**
* 更新居民核酸检测信息(通过任务处理器)
* @author wxz
* @date 2022/11/8 下午8:17
* @param jobEntity
*/
public void execSyncByJobProcessor(IcSyncJobEntity jobEntity) {
DataSyncTaskParam p = new DataSyncTaskParam();
// 正常状态
p.setResiStatus("0");
// 指定组织
p.setAgencyId(jobEntity.getOrgId());
List<NatUserInfoResultDTO> resis = null;
int pageNo = 1;
int pageSize = 1000;
do {
// 分页,一次查询1000居民,循环更新他们的核酸检测信息
resis = getNatUserInfoFromDb(p, pageNo, pageSize);
yantaiHsjcByDbView(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR);
pageNo++;
} while (org.springframework.util.CollectionUtils.isEmpty(resis));
}
} }

23
epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml

@ -3,4 +3,27 @@
<mapper namespace="com.epmet.dao.IcSyncJobDao"> <mapper namespace="com.epmet.dao.IcSyncJobDao">
<!--查询可执行任务列表-->
<select id="selectExecutableJobList" resultType="com.epmet.entity.IcSyncJobEntity">
select id,
customer_id,
org_id,
pid,
org_id_path,
job_type,
operator_id,
operation_status,
del_flag,
revision,
created_by,
created_time,
updated_by,
updated_time
from ic_sync_job
where OPERATION_STATUS = 'wating'
and JOB_TYPE = #{jobType}
and DEL_FLAG = 0
order by CREATED_TIME asc
limit #{itemCount}
</select>
</mapper> </mapper>
Loading…
Cancel
Save