Browse Source

Merge remote-tracking branch 'origin/dev_compare_data'

master
yinzuomei 3 years ago
parent
commit
f85caf497e
  1. 2
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java
  2. 25
      epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/NatInfoScanTask.java
  3. 89
      epmet-user/epmet-user-client/src/main/java/com/epmet/dto/IcSyncJobDTO.java
  4. 2
      epmet-user/epmet-user-client/src/main/java/com/epmet/dto/form/DataSyncTaskParam.java
  5. 9
      epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/NatUserInfoResultDTO.java
  6. 13
      epmet-user/epmet-user-client/src/main/java/com/epmet/feign/EpmetUserOpenFeignClient.java
  7. 19
      epmet-user/epmet-user-client/src/main/java/com/epmet/feign/fallback/EpmetUserOpenFeignClientFallback.java
  8. 2
      epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java
  9. 21
      epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java
  10. 46
      epmet-user/epmet-user-server/src/main/java/com/epmet/controller/DataSyncConfigController.java
  11. 4
      epmet-user/epmet-user-server/src/main/java/com/epmet/dao/DataSyncConfigDao.java
  12. 21
      epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java
  13. 59
      epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java
  14. 125
      epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcNatCompareRecordExcelImportListener.java
  15. 6
      epmet-user/epmet-user-server/src/main/java/com/epmet/jdbc/config/JdbcTemplateConfig.java
  16. 128
      epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java
  17. 50
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java
  18. 78
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcSyncJobService.java
  19. 413
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java
  20. 13
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcNatCompareRecordServiceImpl.java
  21. 83
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcSyncJobServiceImpl.java
  22. 6
      epmet-user/epmet-user-server/src/main/resources/bootstrap.yml
  23. 11
      epmet-user/epmet-user-server/src/main/resources/mapper/DataSyncConfigDao.xml
  24. 3
      epmet-user/epmet-user-server/src/main/resources/mapper/IcNatDao.xml
  25. 29
      epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml

2
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java

@ -217,6 +217,8 @@ public enum EpmetErrorCode {
CUSTOMER_CATEGORY(9101,"分类已使用,不允许删除"),
CATEGORY_NAME(9102,"分类名称已存在,不允许重复"),
EXIST_SYNC_JOB_ERROR(9103,"存在等待或进行中的任务"),
OTHER_SYNC_JOB_ERROR(9104,"同步任务已提交,请稍后查看数据"),
// open api异常
OPEN_API_UNAUTHENTICATED(10100, "请求未认证"),

25
epmet-module/epmet-job/epmet-job-server/src/main/java/com/epmet/task/NatInfoScanTask.java

@ -30,11 +30,34 @@ public class NatInfoScanTask implements ITask {
if (StringUtils.isNotBlank(params)) {
formDTO = JSON.parseObject(params, DataSyncTaskParam.class);
}
Result result = userOpenFeignClient.natInfoScanTask(formDTO);
/*Result result = userOpenFeignClient.dataSyncForYanTaiTask(formDTO);
if (result.success()) {
log.info("NatInfoScanTask定时任务执行成功");
} else {
log.error("NatInfoScanTask定时任务执行失败:" + result.getMsg());
}*/
// 核酸检测
Result natResult = userOpenFeignClient.natInfoScanTask(formDTO);
if (natResult.success()) {
log.info("NatInfoScanTask定时任务执行成功");
} else {
log.error("NatInfoScanTask定时任务执行失败:" + natResult.getMsg());
}
// 死亡
Result deathResult = userOpenFeignClient.deathInfoScanTask(formDTO);
if (deathResult.success()) {
log.info("deathInfoScanTask定时任务执行成功");
} else {
log.error("deathInfoScanTask定时任务执行失败:" + deathResult.getMsg());
}
// 残疾
Result disabilityResult = userOpenFeignClient.disabilityInfoScanTask(formDTO);
if (disabilityResult.success()) {
log.info("disabilityInfoScanTask定时任务执行成功");
} else {
log.error("disabilityInfoScanTask定时任务执行失败:" + disabilityResult.getMsg());
}
}
}

89
epmet-user/epmet-user-client/src/main/java/com/epmet/dto/IcSyncJobDTO.java

@ -0,0 +1,89 @@
package com.epmet.dto;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
/**
* 同步任务表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2022-11-08
*/
@Data
public class IcSyncJobDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* ID
*/
private String id;
/**
* 客户ID
*/
private String customerId;
/**
* 组织ID
*/
private String orgId;
/**
* 组织ID的上级
*/
private String pid;
/**
* 组织ID的所有上级包括org_id
*/
private String orgIdPath;
/**
* 任务类型残疾disability死亡death核酸nat
*/
private String jobType;
/**
* 操作员IDstaffId
*/
private String operatorId;
/**
* 操作状态,等待中waiting进行中processing结束finish
*/
private String operationStatus;
/**
*
*/
private Integer delFlag;
/**
* 乐观锁
*/
private Integer revision;
/**
* 创建人
*/
private String createdBy;
/**
* 创建时间
*/
private Date createdTime;
/**
* 更新人
*/
private String updatedBy;
/**
* 更新时间
*/
private Date updatedTime;
}

2
epmet-user/epmet-user-client/src/main/java/com/epmet/dto/form/DataSyncTaskParam.java

@ -47,4 +47,6 @@ public class DataSyncTaskParam implements Serializable {
*/
private String agencyId = null;
private String dataCode;
private String staffId;
}

9
epmet-user/epmet-user-client/src/main/java/com/epmet/dto/result/NatUserInfoResultDTO.java

@ -3,6 +3,7 @@ package com.epmet.dto.result;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @Author zxc
@ -37,4 +38,12 @@ public class NatUserInfoResultDTO implements Serializable {
* ic_nat表ID
*/
private String id;
/**
* 采样时间
* @author wxz
* @date 2022/11/9 上午12:22
* @param null
*/
private Date sampleTime;
}

13
epmet-user/epmet-user-client/src/main/java/com/epmet/feign/EpmetUserOpenFeignClient.java

@ -922,8 +922,8 @@ public interface EpmetUserOpenFeignClient {
@PostMapping("/epmetuser/userbaseinfo/dingResiLogin")
Result<DingLoginResiResDTO> dingResiLogin(@RequestBody DingLoginResiFormDTO formDTO);
@PostMapping("/epmetuser/dataSyncConfig/natInfoScanTask")
Result natInfoScanTask(@RequestBody DataSyncTaskParam formDTO);
@PostMapping("/epmetuser/dataSyncConfig/dataSyncForYanTaiTask")
Result dataSyncForYanTaiTask(@RequestBody DataSyncTaskParam formDTO);
/**
* Desc: 客户下所有工作人员放缓存
@ -943,4 +943,13 @@ public interface EpmetUserOpenFeignClient {
*/
@PostMapping("/epmetuser/registerrelation/getAllResiByGrid")
Result<Integer> getAllResiByGrid(@RequestBody AllResiByGridFormDTO formDTO);
@PostMapping("/epmetuser/dataSyncConfig/natInfoScanTask")
Result natInfoScanTask(@RequestBody DataSyncTaskParam formDTO);
@PostMapping("/epmetuser/dataSyncConfig/deathInfoScanTask")
Result deathInfoScanTask(@RequestBody DataSyncTaskParam formDTO);
@PostMapping("/epmetuser/dataSyncConfig/disabilityInfoScanTask")
Result disabilityInfoScanTask(@RequestBody DataSyncTaskParam formDTO);
}

19
epmet-user/epmet-user-client/src/main/java/com/epmet/feign/fallback/EpmetUserOpenFeignClientFallback.java

@ -713,8 +713,8 @@ public class EpmetUserOpenFeignClientFallback implements EpmetUserOpenFeignClien
}
@Override
public Result natInfoScanTask(DataSyncTaskParam formDTO) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_USER_SERVER, "natInfoScanTask", formDTO);
public Result dataSyncForYanTaiTask(DataSyncTaskParam formDTO) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_USER_SERVER, "dataSyncForYanTaiTask", formDTO);
}
@Override
@ -726,4 +726,19 @@ public class EpmetUserOpenFeignClientFallback implements EpmetUserOpenFeignClien
public Result<Integer> getAllResiByGrid(AllResiByGridFormDTO formDTO) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_USER_SERVER, "getAllResiByGrid", formDTO);
}
@Override
public Result natInfoScanTask(DataSyncTaskParam formDTO) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_USER_SERVER, "natInfoScanTaskV2", formDTO);
}
@Override
public Result deathInfoScanTask(DataSyncTaskParam formDTO) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_USER_SERVER, "deathInfoScanTask", formDTO);
}
@Override
public Result disabilityInfoScanTask(DataSyncTaskParam formDTO) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_USER_SERVER, "disabilityInfoScanTask", formDTO);
}
}

2
epmet-user/epmet-user-server/src/main/java/com/epmet/UserApplication.java

@ -14,6 +14,7 @@ import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 管理后台
@ -22,6 +23,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
* @since 1.0.0
*/
@EnableScheduling
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients

21
epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java

@ -0,0 +1,21 @@
package com.epmet.constant;
public interface EpidemicConstant {
/**
* 数据配置的dataCode
* 核酸检测
* 残疾人
* 死亡
*/
String DATA_CODE_NAT = "hesuan";
String DATA_CODE_DISABILITY = "canji";
String DATA_CODE_DEATH = "siwang";
String JOB_TYPE_NAT = "nat";
String OPERATION_STATUS_WAITING = "waiting";
String OPERATION_STATUS_PROCESSING = "processing";
String OPERATION_STATUS_FINISH = "finish";
}

46
epmet-user/epmet-user-server/src/main/java/com/epmet/controller/DataSyncConfigController.java

@ -106,9 +106,53 @@ public class DataSyncConfigController {
return new Result();
}
@PostMapping("dataSyncForYanTaiTask")
public Result dataSyncForYanTaiTask(@RequestBody DataSyncTaskParam formDTO){
dataSyncConfigService.dataSyncForYanTaiTask(formDTO);
return new Result();
}
/**
* @Description 核酸检测信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 10:37
*/
@PostMapping("natInfoScanTask")
public Result natInfoScanTask(@RequestBody DataSyncTaskParam formDTO){
dataSyncConfigService.dataSyncForYanTaiTask(formDTO);
dataSyncConfigService.natInfoScanTask(formDTO);
return new Result();
}
@PostMapping("natInfoSyncButton")
public Result natInfoSyncButton(@RequestBody DataSyncTaskParam formDTO, @LoginUser TokenDto tokenDto){
formDTO.setCustomerId(tokenDto.getCustomerId());
formDTO.setStaffId(tokenDto.getUserId());
dataSyncConfigService.natInfoSyncButton(formDTO);
return new Result();
}
/**
* @Description 死亡信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 09:01
*/
@PostMapping("deathInfoScanTask")
public Result deathInfoScanTask(@RequestBody DataSyncTaskParam formDTO){
dataSyncConfigService.deathInfoScanTask(formDTO);
return new Result();
}
/**
* @Description 残疾信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 09:01
*/
@PostMapping("disabilityInfoScanTask")
public Result disabilityInfoScanTask(@RequestBody DataSyncTaskParam formDTO){
dataSyncConfigService.disabilityInfoScanTask(formDTO);
return new Result();
}

4
epmet-user/epmet-user-server/src/main/java/com/epmet/dao/DataSyncConfigDao.java

@ -38,7 +38,7 @@ public interface DataSyncConfigDao extends BaseDao<DataSyncConfigEntity> {
* @author zxc
* @date 2022/9/26 15:04
*/
List<DataSyncConfigDTO> list(@Param("customerId") String customerId, @Param("switchStatus") String switchStatus);
List<DataSyncConfigDTO> list(@Param("customerId") String customerId, @Param("switchStatus") String switchStatus, @Param("dataCode")String dataCode);
List<DataSyncScopeDTO> scopeList(@Param("id") String id);
@ -60,4 +60,6 @@ public interface DataSyncConfigDao extends BaseDao<DataSyncConfigEntity> {
*/
List<NatUserInfoResultDTO> getIdCardsByScope(DataSyncTaskParam formDTO);
DataSyncConfigDTO getConfigInfoByType(@Param("customerId") String customerId, @Param("switchStatus") String switchStatus,@Param("dataCode")String dataCode);
}

21
epmet-user/epmet-user-server/src/main/java/com/epmet/dao/IcSyncJobDao.java

@ -0,0 +1,21 @@
package com.epmet.dao;
import com.epmet.commons.mybatis.dao.BaseDao;
import com.epmet.entity.IcSyncJobEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 同步任务表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2022-11-08
*/
@Mapper
public interface IcSyncJobDao extends BaseDao<IcSyncJobEntity> {
List<IcSyncJobEntity> selectExecutableJobList(@Param("jobType") String jobType,
@Param("itemCount") int itemCount);
}

59
epmet-user/epmet-user-server/src/main/java/com/epmet/entity/IcSyncJobEntity.java

@ -0,0 +1,59 @@
package com.epmet.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.epmet.commons.mybatis.entity.BaseEpmetEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* 同步任务表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2022-11-08
*/
@Data
@EqualsAndHashCode(callSuper=false)
@TableName("ic_sync_job")
public class IcSyncJobEntity extends BaseEpmetEntity {
private static final long serialVersionUID = 1L;
/**
* 客户ID
*/
private String customerId;
/**
* 组织ID
*/
private String orgId;
/**
* 组织ID的上级
*/
private String pid;
/**
* 组织ID的所有上级包括org_id
*/
private String orgIdPath;
/**
* 任务类型残疾disability死亡death核酸nat
*/
private String jobType;
/**
* 操作员IDstaffId
*/
private String operatorId;
/**
* 操作状态,等待中waiting进行中processing结束finish
*/
private String operationStatus;
}

125
epmet-user/epmet-user-server/src/main/java/com/epmet/excel/handler/IcNatCompareRecordExcelImportListener.java

@ -5,9 +5,6 @@ import com.alibaba.excel.read.listener.ReadListener;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.constant.StrConstant;
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult;
import com.epmet.commons.tools.dto.result.YtHscyResDTO;
import com.epmet.commons.tools.dto.result.YtHsjcResDTO;
import com.epmet.commons.tools.dto.result.YtHsjcResDetailDTO;
import com.epmet.commons.tools.enums.EnvEnum;
import com.epmet.commons.tools.exception.EpmetException;
import com.epmet.commons.tools.exception.ExceptionUtils;
@ -15,18 +12,17 @@ import com.epmet.commons.tools.exception.ValidateException;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.DateUtils;
import com.epmet.commons.tools.utils.ObjectUtil;
import com.epmet.commons.tools.utils.YtHsResUtils;
import com.epmet.commons.tools.validator.ValidatorUtils;
import com.epmet.entity.IcNatCompareRecordEntity;
import com.epmet.excel.data.IcNatCompareRecordExcelData;
import com.epmet.service.impl.IcNatCompareRecordServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
/**
* @Description
@ -60,16 +56,22 @@ public class IcNatCompareRecordExcelImportListener implements ReadListener<IcNat
* 导入日期yyyyMMdd
*/
private String importDate;
public IcNatCompareRecordExcelImportListener(String customerId, CustomerStaffInfoCacheResult staffInfo,String importDate,Date importTime, IcNatCompareRecordServiceImpl icNatCompareRecordService) {
private NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate;
public IcNatCompareRecordExcelImportListener(String customerId, CustomerStaffInfoCacheResult staffInfo,String importDate,Date importTime,
IcNatCompareRecordServiceImpl icNatCompareRecordService,
NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate) {
this.customerId = customerId;
this.staffInfo = staffInfo;
this.icNatCompareRecordService = icNatCompareRecordService;
this.importDate=importDate;
this.importTime=importTime;
this.yantaiNamedParamLantuJdbcTemplate=yantaiNamedParamLantuJdbcTemplate;
}
@Override
/*@Override
public void invoke(IcNatCompareRecordExcelData data, AnalysisContext analysisContext) {
try {
// log.warn("有数据吗?"+JSON.toJSONString(data));
@ -137,6 +139,110 @@ public class IcNatCompareRecordExcelImportListener implements ReadListener<IcNat
}
datas.add(compareRecordEntity);
if (datas.size() == MAX_THRESHOLD) {
execPersist();
}
} catch (Exception e) {
String errorMsg = null;
if (e instanceof ValidateException) {
errorMsg = ((ValidateException) e).getMsg();
} else if (e instanceof EpmetException) {
errorMsg = ((EpmetException) e).getMsg();
} else {
errorMsg = "未知错误";
log.error("【未做核酸比对导入】出错:{}", ExceptionUtils.getErrorStackTrace(e));
}
IcNatCompareRecordExcelData.ErrorRow errorRow = new IcNatCompareRecordExcelData.ErrorRow();
errorRow.setIdCard(data.getIdCard());
errorRow.setName(data.getName());
errorRow.setMobile(data.getMobile());
errorRow.setErrorInfo(errorMsg);
errorRows.add(errorRow);
}
}*/
@Override
public void invoke(IcNatCompareRecordExcelData data, AnalysisContext analysisContext) {
try {
// log.warn("有数据吗?"+JSON.toJSONString(data));
// 不能为空先校验数据
ValidatorUtils.validateEntity(data);
// 去除空格
ObjectUtil.objectToTrim(data);
IcNatCompareRecordEntity compareRecordEntity = ConvertUtils.sourceToTarget(data, IcNatCompareRecordEntity.class);
compareRecordEntity.setCustomerId(customerId);
compareRecordEntity.setLatestNatTime(null);
compareRecordEntity.setNatAddress(StrConstant.EPMETY_STR);
compareRecordEntity.setNatResult(StrConstant.EPMETY_STR);
// 开发和测试没法测试,只能写死只有生产才去调用了 烟台客户id:1535072605621841922
EnvEnum currentEnv = EnvEnum.getCurrentEnv();
if (EnvEnum.PROD.getCode().equals(currentEnv.getCode()) && "1535072605621841922".equals(customerId)) {
// 先查询最后一次采样记录
Map<String, Object> args = new HashMap<>();
args.put("idcard", data.getIdCard());
List<Map<String, Object>> hscyList = yantaiNamedParamLantuJdbcTemplate.queryForList(
"select id, name,card_no, create_time, realname from hscyxxb where card_no =:idcard order by create_time desc limit 1", args);
if (CollectionUtils.isNotEmpty(hscyList) && MapUtils.isNotEmpty(hscyList.get(0))) {
// 存在 最近一条采样记录
Map<String, Object> latestCyMap = hscyList.get(0);
Date create_time = (Date) latestCyMap.get("create_time");
if (create_time == null) {
// 最近一次采样时间为空,说明这人好久不做核酸了
compareRecordEntity.setInternalRemark("采样时间create_time is null");
} else {
// 赋值采样时间
compareRecordEntity.setLatestCyTime(create_time);
// 查询最后一次检测结果
List<Map<String, Object>> hsjcResultList = yantaiNamedParamLantuJdbcTemplate.queryForList(
"select name, telephone, card_no, address, test_time, SAMPLE_TIME, SAMPLE_RESULT_PCR, SAMPLING_ORG_PCR from hsjcxxb where card_no =:idcard order by test_time desc limit 1", args);
if (CollectionUtils.isNotEmpty(hsjcResultList) && MapUtils.isNotEmpty(hsjcResultList.get(0))) {
Map<String, Object> latestJcMap = hsjcResultList.get(0);
// 采样时间
Date sample_time = (Date) latestJcMap.get("SAMPLE_TIME");
// 检测时间
Date test_time = (Date) latestJcMap.get("test_time");
// 核酸采样机构
String sampling_org_pcr = (String) latestJcMap.get("SAMPLING_ORG_PCR");
// 核酸检测结果 1:阳性,2:阴性
String sample_result_pcr = (String) latestJcMap.get("SAMPLE_RESULT_PCR");
// 联系地址
String address = (String) latestJcMap.get("address");
// 最近一次采样时间,与最近一次检测结果的采样时间相比较
if (compareRecordEntity.getLatestCyTime().equals(sample_time)) {
// 一致说明出结果了
if (test_time != null) {
// 赋值最近一次核酸时间
compareRecordEntity.setLatestNatTime(test_time);
}
// 赋值检测地点
compareRecordEntity.setNatAddress(StringUtils.isNotBlank(sampling_org_pcr) ? sampling_org_pcr : StrConstant.EPMETY_STR);
// 核酸检测结果 1:阳性,2:阴性
if (NumConstant.ONE_STR.equals(sample_result_pcr)) {
// 检测结果(0:阴性 1:阳性):接口填入
compareRecordEntity.setNatResult(NumConstant.ONE_STR);
} else if (NumConstant.TWO_STR.equals(sample_result_pcr)) {
compareRecordEntity.setNatResult(NumConstant.ZERO_STR);
}
compareRecordEntity.setContactAddress(StringUtils.isNotBlank(address) ? address : StrConstant.EPMETY_STR);
} else {
// 采样时间不一致,说明未出结果
compareRecordEntity.setInternalRemark(String.format("采样时间不一致,采样视图最近一次采样时间:%s,检测结果视图的最近一次采样时间:%s", compareRecordEntity.getLatestCyTime(), DateUtils.format(sample_time)));
}
} else {
compareRecordEntity.setInternalRemark("最近一次检测结果为空");
}
}
} else {
// 没有核酸采样记录
compareRecordEntity.setInternalRemark("最近一次采样结果为空");
}
}
datas.add(compareRecordEntity);
if (datas.size() == MAX_THRESHOLD) {
execPersist();
}
@ -159,6 +265,7 @@ public class IcNatCompareRecordExcelImportListener implements ReadListener<IcNat
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
// 最后几条达不到阈值,这里必须再调用一次

6
epmet-user/epmet-user-server/src/main/java/com/epmet/jdbc/config/JdbcTemplateConfig.java

@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import javax.sql.DataSource;
@ -27,4 +28,9 @@ public class JdbcTemplateConfig {
DataSource yantaiLantuDataSource = jdbcDataSourceConfig.createYantaiLantuDataSource();
return new JdbcTemplate(yantaiLantuDataSource);
}
@Bean
NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate() {
return new NamedParameterJdbcTemplate(yantaiLantuJdbcTemplate());
}
}

128
epmet-user/epmet-user-server/src/main/java/com/epmet/processor/YanTaiNatSyncProcessor.java

@ -0,0 +1,128 @@
package com.epmet.processor;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.DataSyncConfigService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT;
// 烟台核酸检测数据同步处理器
@Component
@Slf4j
public class YanTaiNatSyncProcessor {
public static final int MAX_EXECUTING_COUNT = 3;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobDao icSyncJobDao;
@Autowired
private DataSyncConfigService dataSyncConfigService;
@Autowired
private DistributedLock distributedLock;
@Autowired
RedisUtils redisUtils;
/**
* 定时扫描和执行同步任务
* 10s扫一次库
*
* @author wxz
* @date 2022/11/8 下午5:42
*/
@Scheduled(cron = "0/10 * * * * ? ")
public void scanJobs() {
//log.info("【异步数据更新】开始同步任务");
//String dataSyncEnable = redisUtils.getString("data:sync:enable");
//if (StringUtils.isEmpty(dataSyncEnable)) {
// return;
//}
LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>();
executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery);
if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) {
// 最多只允许同时3条线程运行
return;
}
int executingCount = executingJobList.size();
// 还可以运行几条线程
int leftCount = MAX_EXECUTING_COUNT - executingCount;
RLock lock = null;
try {
lock = distributedLock.getLock("data:sync:" + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
// 查询可执行的任务列表,并且异步执行
List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList(
EpidemicConstant.JOB_TYPE_NAT,
leftCount);
if (!CollectionUtils.isEmpty(icSyncJobToExec)) {
// 异步提交任务
for (IcSyncJobEntity jobEntity : icSyncJobToExec) {
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING);
executorService.submit(() -> {
// 将此任务状态修改为执行中
try {
dataSyncConfigService.execSyncByJobProcessor(jobEntity);
} finally {
// 更新任务状态为结束
updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH);
}
});
}
}
} catch (Exception e) {
log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e));
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* 更新任务状态
* @author wxz
* @date 2022/11/8 下午8:25
* @param id
* @param status
*/
private void updateJobStatus(String id, String status) {
LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>();
query.eq(IcSyncJobEntity::getId, id);
IcSyncJobEntity updateEntity = new IcSyncJobEntity();
updateEntity.setOperationStatus(status);
icSyncJobDao.update(updateEntity, query);
}
}

50
epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java

@ -8,7 +8,11 @@ import com.epmet.dto.DataSyncConfigDTO;
import com.epmet.dto.form.ConfigSwitchFormDTO;
import com.epmet.dto.form.DataSyncTaskParam;
import com.epmet.dto.form.ScopeSaveFormDTO;
import com.epmet.dto.result.NatUserInfoResultDTO;
import com.epmet.entity.DataSyncConfigEntity;
import com.epmet.entity.IcSyncJobEntity;
import java.util.List;
/**
* 数据更新配置表
@ -83,4 +87,50 @@ public interface DataSyncConfigService extends BaseService<DataSyncConfigEntity>
void scopeSave(ScopeSaveFormDTO formDTO);
void dataSyncForYanTaiTask(DataSyncTaskParam formDTO);
/**
* @Description 死亡信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 09:01
*/
void deathInfoScanTask(DataSyncTaskParam formDTO);
/**
* @Description 残疾信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 09:01
*/
void disabilityInfoScanTask(DataSyncTaskParam formDTO);
/**
* @Description 核酸检测信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 10:37
*/
void natInfoScanTask(DataSyncTaskParam formDTO);
void natInfoSyncButton(DataSyncTaskParam formDTO);
List<NatUserInfoResultDTO> getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize);
/**
* 烟台核酸检测(视图方式获取数据)
* @param resiInfos
* @param customerId
* @param isSync
*/
void yantaiHsjcByDbView(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync);
/**
* 更新居民核酸检测信息(通过任务处理器)
* @author wxz
* @date 2022/11/8 下午8:17
* @param jobEntity
*/
void execSyncByJobProcessor(IcSyncJobEntity jobEntity);
}

78
epmet-user/epmet-user-server/src/main/java/com/epmet/service/IcSyncJobService.java

@ -0,0 +1,78 @@
package com.epmet.service;
import com.epmet.commons.mybatis.service.BaseService;
import com.epmet.commons.tools.page.PageData;
import com.epmet.dto.IcSyncJobDTO;
import com.epmet.entity.IcSyncJobEntity;
import java.util.List;
import java.util.Map;
/**
* 同步任务表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2022-11-08
*/
public interface IcSyncJobService extends BaseService<IcSyncJobEntity> {
/**
* 默认分页
*
* @param params
* @return PageData<IcSyncJobDTO>
* @author generator
* @date 2022-11-08
*/
PageData<IcSyncJobDTO> page(Map<String, Object> params);
/**
* 默认查询
*
* @param params
* @return java.util.List<IcSyncJobDTO>
* @author generator
* @date 2022-11-08
*/
List<IcSyncJobDTO> list(Map<String, Object> params);
/**
* 单条查询
*
* @param id
* @return IcSyncJobDTO
* @author generator
* @date 2022-11-08
*/
IcSyncJobDTO get(String id);
/**
* 默认保存
*
* @param dto
* @return void
* @author generator
* @date 2022-11-08
*/
void save(IcSyncJobDTO dto);
/**
* 默认更新
*
* @param dto
* @return void
* @author generator
* @date 2022-11-08
*/
void update(IcSyncJobDTO dto);
/**
* 批量删除
*
* @param ids
* @return void
* @author generator
* @date 2022-11-08
*/
void delete(String[] ids);
}

413
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

@ -1,28 +1,37 @@
package com.epmet.service.impl;
import java.util.Date;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.epmet.commons.mybatis.entity.BaseEpmetEntity;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.constant.StrConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.dto.form.PageFormDTO;
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult;
import com.epmet.commons.tools.dto.result.YtDataSyncResDTO;
import com.epmet.commons.tools.dto.result.YtHscyResDTO;
import com.epmet.commons.tools.dto.result.YtHsjcResDTO;
import com.epmet.commons.tools.enums.GenderEnum;
import com.epmet.commons.tools.exception.EpmetErrorCode;
import com.epmet.commons.tools.exception.EpmetException;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.page.PageData;
import com.epmet.commons.tools.redis.common.CustomerOrgRedis;
import com.epmet.commons.tools.redis.common.CustomerStaffRedis;
import com.epmet.commons.tools.redis.common.bean.AgencyInfoCache;
import com.epmet.commons.tools.redis.common.bean.GridInfoCache;
import com.epmet.commons.tools.security.dto.TokenDto;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.utils.DateUtils;
import com.epmet.commons.tools.utils.YtHsResUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.DataSyncConfigDao;
import com.epmet.dao.IcNatDao;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.dto.DataSyncConfigDTO;
import com.epmet.dto.DataSyncRecordDeathDTO;
import com.epmet.dto.DataSyncRecordDisabilityDTO;
@ -39,8 +48,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -48,9 +59,12 @@ import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.epmet.constant.EpidemicConstant.*;
/**
* 数据更新配置表
*
@ -75,6 +89,15 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
private DataSyncRecordDisabilityService dataSyncRecordDisabilityService;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobService icSyncJobService;
@Autowired
private IcSyncJobDao icSyncJobDao;
@Autowired
private DistributedLock distributedLock;
@Resource(name = "yantaiNamedParamLantuJdbcTemplate")
private NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate;
@Resource(name = "yantaiLantuJdbcTemplate")
private JdbcTemplate yantaiJdbcTemplate;
@ -130,7 +153,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
public PageData list(TokenDto tokenDto, PageFormDTO formDTO) {
PageData<DataSyncConfigDTO> result = new PageData<>(new ArrayList<>(), NumConstant.ZERO_L);
PageInfo<DataSyncConfigDTO> pageInfo = PageHelper.startPage(formDTO.getPageNo(), formDTO.getPageSize())
.doSelectPageInfo(() -> baseDao.list(tokenDto.getCustomerId(), null));
.doSelectPageInfo(() -> baseDao.list(tokenDto.getCustomerId(), null,null));
if (CollectionUtils.isNotEmpty(pageInfo.getList())) {
result.setList(pageInfo.getList());
result.setTotal(Integer.parseInt(String.valueOf(pageInfo.getTotal())));
@ -185,7 +208,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
*/
@Override
public void dataSyncForYanTaiTask(DataSyncTaskParam formDTO) {
List<DataSyncConfigDTO> allConfigList = baseDao.list(formDTO.getCustomerId(), "open");
List<DataSyncConfigDTO> allConfigList = baseDao.list(formDTO.getCustomerId(), "open",null);
if (CollectionUtils.isEmpty(allConfigList)) {
return;
}
@ -221,6 +244,259 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
}
}
/**
* @Description 死亡信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 09:01
*/
@Override
public void deathInfoScanTask(DataSyncTaskParam formDTO) {
List<DataSyncConfigDTO> configData = getConfigData(null, EpidemicConstant.DATA_CODE_DEATH);
if (CollectionUtils.isEmpty(configData)){
log.warn("deathInfoScanTask not exists config data,customer is "+formDTO.getCustomerId());
return;
}
long count = configData.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count();
if (count < 1) {
log.warn("deathInfoScanTask scopeList is null");
return;
}
int pageNo = NumConstant.ONE;
int pageSize = NumConstant.ONE_THOUSAND;
List<NatUserInfoResultDTO> dbResiList = null;
do {
for (DataSyncConfigDTO config : configData) {
// 设置查询数据范围
formDTO.setOrgList(config.getScopeList());
DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode());
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize);
if (CollectionUtils.isEmpty(dbResiList)) {
continue;
}
switch (anEnum) {
case SI_WANG:
try {
//查询正常状态的居民
siWang(dbResiList);
log.info("======siWang信息拉取结束======");
} catch (Exception e) {
log.error("death thread execute exception", e);
}
default:
log.warn("没有要处理的数据");
}
}
pageNo++;
} while (dbResiList != null && dbResiList.size() == pageSize);
}
/**
* @Description 残疾信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 09:01
*/
@Override
public void disabilityInfoScanTask(DataSyncTaskParam formDTO) {
List<DataSyncConfigDTO> configData = getConfigData(null, EpidemicConstant.DATA_CODE_DISABILITY);
if (CollectionUtils.isEmpty(configData)){
log.warn("disabilityInfoScanTask not exists config data,customer is "+formDTO.getCustomerId());
return;
}
long count = configData.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count();
if (count < 1) {
log.warn("disabilityInfoScanTask scopeList is null");
return;
}
int pageNo = NumConstant.ONE;
int pageSize = NumConstant.ONE_THOUSAND;
List<NatUserInfoResultDTO> dbResiList = null;
do {
for (DataSyncConfigDTO config : configData) {
// 设置查询数据范围
formDTO.setOrgList(config.getScopeList());
formDTO.setCategoryColumn("IS_CJ");
DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode());
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize);
if (CollectionUtils.isEmpty(dbResiList)) {
continue;
}
switch (anEnum) {
case CAN_JI:
try {
//查询正常状态的居民
canJi(dbResiList);
log.info("======canJi信息拉取结束======");
} catch (Exception e) {
log.error("disability thread execute exception", e);
}
break;
default:
log.warn("没有要处理的数据");
}
}
pageNo++;
} while (dbResiList != null && dbResiList.size() == pageSize);
}
/**
* @Description 核酸检测信息定时拉取
* @param formDTO
* @Author zxc
* @Date 2022/11/8 10:37
*/
@Override
public void natInfoScanTask(DataSyncTaskParam formDTO) {
List<DataSyncConfigDTO> configData = new ArrayList<>();
if (StringUtils.isBlank(formDTO.getAgencyId())){
configData = getConfigData(null, EpidemicConstant.DATA_CODE_NAT);
if (CollectionUtils.isEmpty(configData)){
log.warn("natInfoScanTask not exists config data,customer is "+formDTO.getCustomerId());
return;
}
long count = configData.stream().filter(o -> CollectionUtils.isNotEmpty(o.getScopeList())).count();
if (count < 1) {
log.warn("natInfoScanTask scopeList is null");
return;
}
}
int pageNo = NumConstant.ONE;
int pageSize = NumConstant.ONE_THOUSAND;
List<NatUserInfoResultDTO> dbResiList = null;
do {
if (StringUtils.isBlank(formDTO.getAgencyId())){
for (DataSyncConfigDTO config : configData) {
// 设置查询数据范围
formDTO.setOrgList(config.getScopeList());
DataSyncEnum anEnum = DataSyncEnum.getEnum(config.getDataCode());
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize);
if (CollectionUtils.isEmpty(dbResiList)) {
continue;
}
switch (anEnum) {
case HE_SUAN:
try {
//查询正常状态的居民
yantaiHsjcByDbView(dbResiList, config.getCustomerId(), formDTO.getIsSync());
log.info("======核酸检测信息拉取结束======");
} catch (Exception e) {
log.error("nat thread execute exception", e);
}
break;
default:
log.warn("没有要处理的数据");
}
}
}else {
dbResiList = getNatUserInfoFromDb(formDTO, pageNo, pageSize);
if (CollectionUtils.isEmpty(dbResiList)){
return;
}
yantaiHsjcByDbView(dbResiList, dbResiList.get(NumConstant.ZERO).getCustomerId(), formDTO.getIsSync());
}
pageNo++;
} while (dbResiList != null && dbResiList.size() == pageSize);
}
/**
* 提交同步任务放到任务池
* @author wxz
* @date 2022/11/8 下午5:32
* @param formDTO
*/
@Override
public void natInfoSyncButton(DataSyncTaskParam formDTO) {
CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis.getStaffInfo(formDTO.getCustomerId(), formDTO.getStaffId());
if (null == staffInfo){
throw new EpmetException("未查询到工作人员信息:"+formDTO.getStaffId());
}
AgencyInfoCache agencyInfo = CustomerOrgRedis.getAgencyInfo(formDTO.getAgencyId());
if (null == agencyInfo){
throw new EpmetException("未查询到组织信息:"+formDTO.getAgencyId());
}
// 查询该组织是否存在等待中或者进行中的任务
LambdaQueryWrapper<IcSyncJobEntity> qw = new LambdaQueryWrapper<>();
qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId())
.in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> icSyncJobEntities = icSyncJobDao.selectList(qw);
// 当前组织下存在同步任务
if (CollectionUtils.isNotEmpty(icSyncJobEntities)){
throw new EpmetException(EpmetErrorCode.EXIST_SYNC_JOB_ERROR.getCode());
}
// 不存在新增一条记录
IcSyncJobEntity e = new IcSyncJobEntity();
e.setCustomerId(formDTO.getCustomerId());
e.setOrgId(formDTO.getAgencyId());
e.setPid(agencyInfo.getPid());
e.setOrgIdPath(StringUtils.isBlank(agencyInfo.getPids()) ? agencyInfo.getId() : agencyInfo.getPids()+":"+agencyInfo.getId());
e.setJobType(JOB_TYPE_NAT);
e.setOperatorId(formDTO.getStaffId());
e.setOperationStatus(OPERATION_STATUS_WAITING);
insertSync(e);
//List<IcSyncJobEntity> waitList;
//do {
// LambdaQueryWrapper<IcSyncJobEntity> qw3 = new LambdaQueryWrapper<>();
// qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId())
// .eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING)
// .eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT)
// .orderByAsc(BaseEpmetEntity::getCreatedTime);
// waitList = icSyncJobDao.selectList(qw3);
// if (CollectionUtils.isNotEmpty(waitList)){
// for (IcSyncJobEntity entity : waitList) {
// RLock lock = null;
// try {
// lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
// updateSync(entity.getId(),OPERATION_STATUS_PROCESSING);
// }catch (Exception ex){
// log.error(ex.getMessage());
// throw new EpmetException(ex.getMessage());
// }finally {
// lock.unlock();
// }
// formDTO.setAgencyId(entity.getOrgId());
// try {
// natInfoScanTask(formDTO);
// }catch (Exception ee){
// log.error(ee.getMessage());
// throw new EpmetException(ee.getMessage());
// }finally {
// updateSync(entity.getId(),OPERATION_STATUS_FINISH);
// }
// }
// }
//}while (CollectionUtils.isNotEmpty(waitList));
}
@Transactional(rollbackFor = Exception.class)
public void updateSync(String id,String status){
LambdaUpdateWrapper<IcSyncJobEntity> qwUpdate = new LambdaUpdateWrapper<>();
qwUpdate.eq(BaseEpmetEntity::getId,id)
.set(IcSyncJobEntity::getOperationStatus,status)
.set(BaseEpmetEntity::getUpdatedTime, new Date());
icSyncJobDao.update(null,qwUpdate);
}
@Transactional(rollbackFor = Exception.class)
public void insertSync(IcSyncJobEntity e){
icSyncJobService.insert(e);
}
/**
* @Description 配置信息查询
* @param customerId
* @param dataCode
* @Author zxc
* @Date 2022/11/8 10:41
*/
public List<DataSyncConfigDTO> getConfigData(String customerId, String dataCode){
List<DataSyncConfigDTO> allConfigList = baseDao.list(customerId, "open",dataCode);
return CollectionUtils.isEmpty(allConfigList) ? new ArrayList<>() : allConfigList;
}
private void dataSyncYanTaiParallel(List<DataSyncConfigDTO> configList, DataSyncTaskParam formDTO) {
if (CollectionUtils.isEmpty(configList)) {
log.warn("dataSyncYanTaiParallel configList is null");
@ -255,7 +531,8 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
executorService.submit(() -> {
try {
//查询正常状态的居民
hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
//hsjc(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
yantaiHsjcByDbView(finalDbResiList, config.getCustomerId(), formDTO.getIsSync());
log.info("======核酸检测信息拉取结束======");
} catch (Exception e) {
log.error("hsjc thread execute exception", e);
@ -421,7 +698,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* @param pageSize
* @return
*/
private List<NatUserInfoResultDTO> getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize) {
public List<NatUserInfoResultDTO> getNatUserInfoFromDb(DataSyncTaskParam formDTO, int pageNo, int pageSize) {
//根据 组织 分页获取 居民数据
PageInfo<NatUserInfoResultDTO> pageInfo = PageHelper.startPage(pageNo, pageSize, false)
.doSelectPageInfo(() -> baseDao.getIdCardsByScope(formDTO));
@ -629,10 +906,13 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
* 数据库采样时间+idCard+userId 存在的 做更新
* 数据库采样时间+idCard+userId 不存在的 新增
*/
entities.forEach(e -> existInfo.stream().filter(i -> i.getUserId().equals(e.getUserId()) && i.getIdCard().equals(e.getIdCard())).forEach(i -> {
e.setExistStatus(true);
e.setId(i.getId());
}));
entities.forEach(e -> existInfo.stream()
.filter(i -> i.getUserId().equals(e.getUserId()) && i.getIdCard().equals(e.getIdCard()) && i.getSampleTime().equals(e.getSampleTime()))
.forEach(i -> {
e.setExistStatus(true);
e.setId(i.getId());
}));
Map<Boolean, List<IcNatEntity>> groupByStatus = entities.stream().collect(Collectors.groupingBy(IcNatEntity::getExistStatus));
if (CollectionUtils.isNotEmpty(groupByStatus.get(false))) {
for (List<IcNatEntity> icNatEntities : ListUtils.partition(groupByStatus.get(false), NumConstant.FIVE_HUNDRED)) {
@ -786,35 +1066,43 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
public void yantaiHsjcByDbView(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync) {
List<List<NatUserInfoResultDTO>> resiInfobatchs = ListUtils.partition(resiInfos, 50);
for (List<NatUserInfoResultDTO> resibatch : resiInfobatchs) {
// 50个一批,来处理他们的核酸信息,太多怕给数据库查崩了。
yantaiHsjcByDbViewBatch(resibatch, customerId, isSync);
// n个一批,来处理他们的核酸信息,太多怕给数据库查崩了。
try {
yantaiHsjcByDbViewPartition(resibatch, customerId, isSync);
} catch (Exception e) {
String errorMsg = ExceptionUtils.getErrorStackTrace(e);
log.error("【更新核酸检测信息(from 兰图)】失败,信息:{}", errorMsg);
}
}
}
/**
* 50个一批来处理他们的核酸信息太多怕给数据库查崩了
* n个一批来处理他们的核酸信息太多怕给数据库查崩了
* @param resiInfos
* @param customerId
* @param isSync
*/
public void yantaiHsjcByDbViewBatch(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync) {
public void yantaiHsjcByDbViewPartition(List<NatUserInfoResultDTO> resiInfos, String customerId, String isSync) {
// 将居民信息转化为<idCard,resiInfo>的map
Map<String, NatUserInfoResultDTO> idCardAndResiInfo = resiInfos.stream().collect(Collectors.toMap(resi -> resi.getIdCard(), Function.identity()));
Set<String> idCards = idCardAndResiInfo.keySet();//resiInfos.stream().map(resi -> resi.getIdCard()).collect(Collectors.toList());
// String idCardsStr = "''" + String.join("','", idCards) + "''";
List<String> idCards = new ArrayList<>(idCardAndResiInfo.keySet());
// 1.获取核酸采样信息
String sql = "select id, name,card_no, create_time, realname from hscyxxb where card_no in (:idCards) order by create_time desc";
HashMap<String, Object> args = new HashMap<>();
args.put("idCards", idCards);
Map<String, Object> args = new HashMap<>();
args.put("idcards", idCards);
//log.info("【更新核酸检测信息(from 兰图)】本批次身份证号为:{}", String.join(",", idCards));
// 2.=====================核酸采样=========================
// 这一批居民的核酸采样列表
List<Map<String, Object>> hscyList = yantaiJdbcTemplate.queryForList(sql, args);
List<Map<String, Object>> hscyList = yantaiNamedParamLantuJdbcTemplate.queryForList(
"select id, name,card_no, create_time, realname from hscyxxb where card_no in (:idcards)", args);
if (CollectionUtils.isNotEmpty(hscyList)) {
List<IcNatEntity> entities = new ArrayList<>();
hscyList.forEach(sampleInfo -> {
hscyList.forEach(resiHscyInfo -> {
// 从视图中获取到的核酸采样相关信息
String name = (String) sampleInfo.get("name");
String cardNo = (String) sampleInfo.get("card_no");
Date createTime = (Date) sampleInfo.get("create_time");
String name = (String) resiHscyInfo.get("name");
String cardNo = (String) resiHscyInfo.get("card_no");
Date createTime = (Date) resiHscyInfo.get("create_time");
// 本地数据库中,居民信息
NatUserInfoResultDTO currentResiInfo = idCardAndResiInfo.get(cardNo);
@ -828,7 +1116,6 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
e.setName(StringUtils.isNotBlank(name) ? name : "");
e.setIdCard(StringUtils.isNotBlank(cardNo) ? cardNo : "");
e.setSampleTime(createTime);
// e.setSampleTime(DateUtils.parseDate(createTime, DateUtils.DATE_TIME_PATTERN));
e.setAgencyId(currentResiInfo.getAgencyId());
e.setPids(currentResiInfo.getPids());
e.setAttachmentType("");
@ -840,5 +1127,85 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
sampleAndNat(existSampleInfo,entities,NumConstant.ONE_STR,customerId,isSync);
}
}
// 2.=====================核酸采样=========================
// 这一批居民的核酸采样列表
List<Map<String, Object>> hsjcResultList = yantaiNamedParamLantuJdbcTemplate.queryForList(
"select name, telephone, card_no, test_time, SAMPLE_TIME, SAMPLE_RESULT_PCR, SAMPLING_ORG_PCR from hsjcxxb where card_no in (:idcards)", args);
if (CollectionUtils.isNotEmpty(hsjcResultList)) {
List<IcNatEntity> entities = new ArrayList<>();
hsjcResultList.forEach(natResult -> {
// 从视图中获取到的核酸采样相关信息
String name = (String) natResult.get("name");
String cardNo = (String) natResult.get("card_no");
Date testTime = (Date) natResult.get("test_time");
String telephone = (String) natResult.get("telephone");
Date sampleTime = (Date) natResult.get("SAMPLE_TIME");
String sampleResultPcr = (String) natResult.get("SAMPLE_RESULT_PCR");
String samplingOrgPcr = (String) natResult.get("SAMPLING_ORG_PCR");
// 本地数据库中,居民信息
NatUserInfoResultDTO currentResiInfo = idCardAndResiInfo.get(cardNo);
IcNatEntity e = new IcNatEntity();
e.setCustomerId(customerId);
e.setIsResiUser(StringUtils.isBlank(currentResiInfo.getUserId()) ? NumConstant.ZERO_STR : NumConstant.ONE_STR);
e.setUserId(currentResiInfo.getUserId());
e.setUserType(isSync.equals(NumConstant.ONE_STR) ? "manualSync" : "sync");
e.setName(StringUtils.isNotBlank(name) ? name : "");
e.setMobile(StringUtils.isNotBlank(telephone) ? telephone : "");
e.setIdCard(StringUtils.isNotBlank(cardNo) ? cardNo : "");
e.setNatTime(testTime);
e.setSampleTime(sampleTime);
String resultPcr = sampleResultPcr;
//检测结果 转换 我们 0:阴性 1:阳性, 他们 :1:阳性,2:阴性
e.setNatResult(NumConstant.ZERO_STR);
if (NumConstant.ONE_STR.equals(resultPcr)) {
e.setNatResult(NumConstant.ONE_STR);
}
e.setNatAddress(samplingOrgPcr);
e.setAgencyId(currentResiInfo.getAgencyId());
e.setPids(currentResiInfo.getPids());
e.setAttachmentType("");
e.setAttachmentUrl("");
entities.add(e);
});
if (CollectionUtils.isNotEmpty(entities)) {
List<NatUserInfoResultDTO> existNatInfos = icNatDao.getExistNatInfo(entities);
sampleAndNat(existNatInfos,entities,NumConstant.TWO_STR,customerId,isSync);
}
}
}
/**
* 更新居民核酸检测信息(通过任务处理器)
* @author wxz
* @date 2022/11/8 下午8:17
* @param jobEntity
*/
public void execSyncByJobProcessor(IcSyncJobEntity jobEntity) {
DataSyncTaskParam p = new DataSyncTaskParam();
// 正常状态
p.setResiStatus("0");
// 指定组织
p.setAgencyId(jobEntity.getOrgIdPath());
List<NatUserInfoResultDTO> resis = null;
int pageNo = 1;
int pageSize = 1000;
int updatedResiCount = 0;
log.info("【任务处理器同步数据】组织Id:{},开始同步数据,同步类型:{}", jobEntity.getOrgId(), jobEntity.getJobType());
do {
// 分页,一次查询1000居民,循环更新他们的核酸检测信息
resis = getNatUserInfoFromDb(p, pageNo, pageSize);
if (CollectionUtils.isNotEmpty(resis)) {
yantaiHsjcByDbView(resis, jobEntity.getCustomerId(), NumConstant.ONE_STR);
pageNo++;
updatedResiCount += resis.size();
}
} while (CollectionUtils.isNotEmpty(resis));
log.info("【任务处理器同步数据】组织Id:{},同步类型:{},已完成居民数:{}", jobEntity.getOrgId(), jobEntity.getJobType(), updatedResiCount);
}
}

13
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcNatCompareRecordServiceImpl.java

@ -38,10 +38,12 @@ import org.apache.commons.fileupload.disk.DiskFileItemFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.entity.ContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.commons.CommonsMultipartFile;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
@ -66,6 +68,10 @@ public class IcNatCompareRecordServiceImpl extends BaseServiceImpl<IcNatCompareR
@Autowired
private IcNatCompareRecRelationDao icNatCompareRecRelationDao;
@Resource(name = "yantaiNamedParamLantuJdbcTemplate")
private NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate;
private CustomerStaffInfoCacheResult queryCurrentStaff(String customerId, String userId) {
CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis.getStaffInfo(customerId, userId);
if (null == staffInfo) {
@ -106,7 +112,12 @@ public class IcNatCompareRecordServiceImpl extends BaseServiceImpl<IcNatCompareR
CustomerStaffInfoCacheResult staffInfo= queryCurrentStaff(customerId,userId);
Date importTime=new Date();
String importDate= DateUtils.format(importTime,DateUtils.DATE_PATTERN_YYYYMMDD);
IcNatCompareRecordExcelImportListener listener = new IcNatCompareRecordExcelImportListener(customerId, staffInfo, importDate, importTime, this);
IcNatCompareRecordExcelImportListener listener = new IcNatCompareRecordExcelImportListener(customerId,
staffInfo,
importDate,
importTime,
this,
yantaiNamedParamLantuJdbcTemplate);
EasyExcel.read(filePath.toFile(), IcNatCompareRecordExcelData.class, listener).headRowNumber(1).sheet(0).doRead();

83
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/IcSyncJobServiceImpl.java

@ -0,0 +1,83 @@
package com.epmet.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.page.PageData;
import com.epmet.commons.tools.utils.ConvertUtils;
import com.epmet.commons.tools.constant.FieldConstant;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.dto.IcSyncJobDTO;
import com.epmet.entity.IcSyncJobEntity;
import com.epmet.service.IcSyncJobService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* 同步任务表
*
* @author generator generator@elink-cn.com
* @since v1.0.0 2022-11-08
*/
@Service
public class IcSyncJobServiceImpl extends BaseServiceImpl<IcSyncJobDao, IcSyncJobEntity> implements IcSyncJobService {
@Override
public PageData<IcSyncJobDTO> page(Map<String, Object> params) {
IPage<IcSyncJobEntity> page = baseDao.selectPage(
getPage(params, FieldConstant.CREATED_TIME, false),
getWrapper(params)
);
return getPageData(page, IcSyncJobDTO.class);
}
@Override
public List<IcSyncJobDTO> list(Map<String, Object> params) {
List<IcSyncJobEntity> entityList = baseDao.selectList(getWrapper(params));
return ConvertUtils.sourceToTarget(entityList, IcSyncJobDTO.class);
}
private QueryWrapper<IcSyncJobEntity> getWrapper(Map<String, Object> params){
String id = (String)params.get(FieldConstant.ID_HUMP);
QueryWrapper<IcSyncJobEntity> wrapper = new QueryWrapper<>();
wrapper.eq(StringUtils.isNotBlank(id), FieldConstant.ID, id);
return wrapper;
}
@Override
public IcSyncJobDTO get(String id) {
IcSyncJobEntity entity = baseDao.selectById(id);
return ConvertUtils.sourceToTarget(entity, IcSyncJobDTO.class);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void save(IcSyncJobDTO dto) {
IcSyncJobEntity entity = ConvertUtils.sourceToTarget(dto, IcSyncJobEntity.class);
insert(entity);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void update(IcSyncJobDTO dto) {
IcSyncJobEntity entity = ConvertUtils.sourceToTarget(dto, IcSyncJobEntity.class);
updateById(entity);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void delete(String[] ids) {
// 逻辑删除(@TableLogic 注解)
baseDao.deleteBatchIds(Arrays.asList(ids));
}
}

6
epmet-user/epmet-user-server/src/main/resources/bootstrap.yml

@ -56,9 +56,9 @@ spring:
yantai:
lantu:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/epmet_gov_voice?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
url: jdbc:mysql://10.2.2.61:3367/sjzt?allowPublicKeyRetrieval=true&allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
username: yilian
password: 1qaz2wsx
initial-size: 10
max-active: 100
min-idle: 10

11
epmet-user/epmet-user-server/src/main/resources/mapper/DataSyncConfigDao.xml

@ -47,6 +47,9 @@
<if test='null != customerId and customerId != "" '>
AND CUSTOMER_ID = #{customerId}
</if>
<if test="dataCode != null and dataCode != ''">
AND data_code = #{dataCode}
</if>
order by sort
</select>
@ -103,4 +106,12 @@
</choose>
ORDER BY CREATED_TIME
</select>
<select id="getConfigInfoByType" resultType="com.epmet.dto.DataSyncConfigDTO">
select * FROM data_sync_config
WHERE DEL_FLAG = 0
AND switch_status = #{switchStatus}
AND CUSTOMER_ID = #{customerId}
AND data_code = #{dataCode}
</select>
</mapper>

3
epmet-user/epmet-user-server/src/main/resources/mapper/IcNatDao.xml

@ -161,7 +161,8 @@
SELECT
ID,
USER_ID,
ID_CARD
ID_CARD,
SAMPLE_TIME
FROM ic_nat
WHERE del_flag = '0'
AND USER_ID = #{l.userId}

29
epmet-user/epmet-user-server/src/main/resources/mapper/IcSyncJobDao.xml

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.epmet.dao.IcSyncJobDao">
<!--查询可执行任务列表-->
<select id="selectExecutableJobList" resultType="com.epmet.entity.IcSyncJobEntity">
select id,
customer_id,
org_id,
pid,
org_id_path,
job_type,
operator_id,
operation_status,
del_flag,
revision,
created_by,
created_time,
updated_by,
updated_time
from ic_sync_job
where OPERATION_STATUS = 'waiting'
and JOB_TYPE = #{jobType}
and DEL_FLAG = 0
order by CREATED_TIME asc
limit #{itemCount}
</select>
</mapper>
Loading…
Cancel
Save