Browse Source

疫苗接种拉取暂提

master
zxc 3 years ago
parent
commit
f282ab0914
  1. 6
      epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java
  2. 112
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java
  3. 3
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java
  4. 57
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

6
epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java

@ -12,7 +12,13 @@ public interface EpidemicConstant {
String DATA_CODE_DISABILITY = "canji"; String DATA_CODE_DISABILITY = "canji";
String DATA_CODE_DEATH = "siwang"; String DATA_CODE_DEATH = "siwang";
/**
* ic_sync_job 任务类型
* 核酸检测
* 疫苗接种
*/
String JOB_TYPE_NAT = "nat"; String JOB_TYPE_NAT = "nat";
String JOB_TYPE_VACCINE = "vaccine";
String OPERATION_STATUS_WAITING = "waiting"; String OPERATION_STATUS_WAITING = "waiting";
String OPERATION_STATUS_PROCESSING = "processing"; String OPERATION_STATUS_PROCESSING = "processing";

112
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiVaccineSyncProcessor.java

@ -0,0 +1,112 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.DataSyncConfigService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_VACCINE;
// 烟台核酸检测数据同步处理器
@Component
@Slf4j
public class YanTaiVaccineSyncProcessor {
public static final int MAX_EXECUTING_COUNT = 3;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
@Autowired
private DataSyncConfigService dataSyncConfigService;
@Autowired
private DistributedLock distributedLock;
@Autowired
RedisUtils redisUtils;
/**
* @Description 定时扫描和执行同步任务疫苗接种
* @Author zxc
* @Date 2022/11/11 10:32
*/
@Scheduled(cron = "0/10 * * * * ? ")
public void scanJobs() {
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery);
if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) {
// 最多只允许同时3条线程运行
return;
}
int executingCount = executingJobList.size();
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_COUNT - executingCount;
RLock lock = null;
try {
lock = distributedLock.getLock("data:sync:" + JOB_TYPE_VACCINE, 60L, 60L, TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList(
EpidemicConstant.JOB_TYPE_VACCINE,
leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING);
executorService.submit(() -> {
// 将此任务状态修改为执行中
try {
dataSyncConfigService.execSyncByJobProcessor(jobEntity);
} finally {
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH);
}
});
}
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 更新任务状态
* @author wxz
* @date 2022/11/8 下午8:25
* @param id
* @param status
*/
private void updateJobStatus(String id, String status) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
icSyncJobDao.update(updateEntity, query);
}
}

3
epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java

@ -121,8 +121,9 @@ public interface DataSyncConfigService extends BaseService<DataSyncConfigEntity>
* @param resiInfos * @param resiInfos
* @param customerId * @param customerId
* @param isSync * @param isSync
* @param jobType
*/ */
void yantaiHsjcByDbView(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync); void yanTaiDbViewByType(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync, String jobType);
/** /**
* 更新居民核酸检测信息(通过任务处理器) * 更新居民核酸检测信息(通过任务处理器)

57
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

@ -378,7 +378,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
case HE_SUAN: case HE_SUAN:
try { try {
//查询正常状态的居民 //查询正常状态的居民
yantaiHsjcByDbView(dbResiList, config.getCustomerId(), formDTO.getIsSync()); yanTaiDbViewByType(dbResiList, config.getCustomerId(), formDTO.getIsSync(), JOB_TYPE_NAT);
log.info("======核酸检测信息拉取结束======"); log.info("======核酸检测信息拉取结束======");
} catch (Exception e) { } catch (Exception e) {
log.error("nat thread execute exception", e); log.error("nat thread execute exception", e);
@ -393,7 +393,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
if (CollectionUtils.isEmpty(dbResiList)){ if (CollectionUtils.isEmpty(dbResiList)){
return; return;
} }
yantaiHsjcByDbView(dbResiList, dbResiList.get(NumConstant.ZERO).getCustomerId(), formDTO.getIsSync()); yanTaiDbViewByType(dbResiList, dbResiList.get(NumConstant.ZERO).getCustomerId(), formDTO.getIsSync(),JOB_TYPE_NAT);
} }
pageNo++; pageNo++;
} while (dbResiList != null && dbResiList.size() == pageSize); } while (dbResiList != null && dbResiList.size() == pageSize);
@ -532,7 +532,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
try { try {
//查询正常状态的居民 //查询正常状态的居民
//hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync()); //hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
yantaiHsjcByDbView(finalDbResiList, config.getCustomerId(), formDTO.getIsSync()); yanTaiDbViewByType(finalDbResiList, config.getCustomerId(), formDTO.getIsSync(), JOB_TYPE_NAT);
log.info("======核酸检测信息拉取结束======"); log.info("======核酸检测信息拉取结束======");
} catch (Exception e) { } catch (Exception e) {
log.error("hsjc thread execute exception", e); log.error("hsjc thread execute exception", e);
@ -1063,19 +1063,56 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* @param customerId * @param customerId
* @param isSync * @param isSync
*/ */
public void yantaiHsjcByDbView(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync) { public void yanTaiDbViewByType(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync, String jobType) {
List<List<NatUserInfoResultDTO>> resiInfobatchs = ListUtils.partition(resiInfos, 50); List<List<NatUserInfoResultDTO>> resiInfoBatchs = ListUtils.partition(resiInfos, 50);
for (List<NatUserInfoResultDTO> resibatch : resiInfobatchs) { for (List<NatUserInfoResultDTO> resiBatch : resiInfoBatchs) {
// n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。 // n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。
try { try {
yantaiHsjcByDbViewPartition(resibatch, customerId, isSync); switch (jobType){
// 核酸检测
case JOB_TYPE_NAT:
yantaiHsjcByDbViewPartition(resiBatch, customerId, isSync);
// 疫苗接种
case JOB_TYPE_VACCINE:
yanTaiVaccineByDbViewPartition(resiBatch, customerId, isSync);
default:
break;
}
} catch (Exception e) { } catch (Exception e) {
String errorMsg = ExceptionUtils.getErrorStackTrace(e); String errorMsg = ExceptionUtils.getErrorStackTrace(e);
log.error("【更新核酸检测信息(from 兰图)】失败,信息:{}", errorMsg); log.error("【更新{}信息(from 兰图)】失败,信息:{}", jobType, errorMsg);
} }
} }
} }
/**
* @Description 疫苗接种信息处理
* @param resiInfos
* @param customerId
* @param isSync
* @Author zxc
* @Date 2022/11/11 11:07
*/
public void yanTaiVaccineByDbViewPartition(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync){
// 将居民信息转化为<idCard,resiInfo>的map
Map<String, NatUserInfoResultDTO> idCardAndResiInfoMap = resiInfos.stream().collect(Collectors.toMap(resi -> resi.getIdCard(), Function.identity()));
List<String> idCards = new ArrayList<>(idCardAndResiInfoMap.keySet());
// 1.获取核酸采样信息
Map<String, Object> args = new HashMap<>();
args.put("idcards", idCards);
// todo 疫苗接种视图
List<Map<String, Object>> vaccineList = yantaiNamedParamLantuJdbcTemplate.queryForList(
"select id, name,card_no, create_time, realname from hscyxxb where card_no in (:idcards)", args);
if (CollectionUtils.isNotEmpty(vaccineList)) {
List<IcVaccineEntity> entities = new ArrayList<>();
vaccineList.forEach(resiVaccineInfo -> {
// 从视图中获取到的疫苗接种相关信息
});
}
}
/** /**
* n个一批来处理他们的核酸信息太多怕给数据库查崩了 * n个一批来处理他们的核酸信息太多怕给数据库查崩了
* @param resiInfos * @param resiInfos
@ -1198,10 +1235,10 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
int updatedResiCount = 0; int updatedResiCount = 0;
log.info("【任务处理器同步数据】组织Id:{},开始同步数据,同步类型:{}", jobEntity.getOrgId(), jobEntity.getJobType()); log.info("【任务处理器同步数据】组织Id:{},开始同步数据,同步类型:{}", jobEntity.getOrgId(), jobEntity.getJobType());
do { do {
// 分页,一次查询1000居民,循环更新他们的核酸检测信息 // 分页,一次查询 1000 居民,循环更新他们的核酸检测信息
resis = getNatUserInfoFromDb(p, pageNo, pageSize); resis = getNatUserInfoFromDb(p, pageNo, pageSize);
if (CollectionUtils.isNotEmpty(resis)) { if (CollectionUtils.isNotEmpty(resis)) {
yantaiHsjcByDbView(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR); yanTaiDbViewByType(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR, jobEntity.getJobType());
pageNo++; pageNo++;
updatedResiCount += resis.size(); updatedResiCount += resis.size();
} }

Loading…
Cancel
Save