Browse Source

同步

master
zxc 3 years ago
parent
commit
f7769f3307
  1. 2
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java
  2. 2
      epmet-user/epmet-user-client/src/main/java/com/epmet/dto/form/DataSyncTaskParam.java
  3. 6
      epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java
  4. 8
      epmet-user/epmet-user-server/src/main/java/com/epmet/controller/DataSyncConfigController.java
  5. 2
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java
  6. 103
      epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

2
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/exception/EpmetErrorCode.java

@ -217,6 +217,8 @@ public enum EpmetErrorCode {
CUSTOMER_CATEGORY(9101,"分类已使用,不允许删除"),
CATEGORY_NAME(9102,"分类名称已存在,不允许重复"),
EXIST_SYNC_JOB_ERROR(9103,"存在同步任务"),
OTHER_SYNC_JOB_ERROR(9104,"同步任务已提交,请稍后查看数据"),
// open api异常
OPEN_API_UNAUTHENTICATED(10100, "请求未认证"),

2
epmet-user/epmet-user-client/src/main/java/com/epmet/dto/form/DataSyncTaskParam.java

@ -47,4 +47,6 @@ public class DataSyncTaskParam implements Serializable {
*/
private String agencyId = null;
private String dataCode;
private String staffId;
}

6
epmet-user/epmet-user-server/src/main/java/com/epmet/constant/EpidemicConstant.java

@ -12,4 +12,10 @@ public interface EpidemicConstant {
String DATA_CODE_DISABILITY = "canji";
String DATA_CODE_DEATH = "siwang";
String JOB_TYPE_NAT = "nat";
String OPERATION_STATUS_WAITING = "waiting";
String OPERATION_STATUS_PROCESSING = "processing";
String OPERATION_STATUS_FINISH = "finish";
}

8
epmet-user/epmet-user-server/src/main/java/com/epmet/controller/DataSyncConfigController.java

@ -124,6 +124,14 @@ public class DataSyncConfigController {
return new Result();
}
@PostMapping("natInfoSyncButton")
public Result natInfoSyncButton(@RequestBody DataSyncTaskParam formDTO, @LoginUser TokenDto tokenDto){
formDTO.setCustomerId(tokenDto.getCustomerId());
formDTO.setStaffId(tokenDto.getUserId());
dataSyncConfigService.natInfoSyncButton(formDTO);
return new Result();
}
/**
* @Description 死亡信息定时拉取
* @param formDTO

2
epmet-user/epmet-user-server/src/main/java/com/epmet/service/DataSyncConfigService.java

@ -108,4 +108,6 @@ public interface DataSyncConfigService extends BaseService<DataSyncConfigEntity>
*/
void natInfoScanTask(DataSyncTaskParam formDTO);
void natInfoSyncButton(DataSyncTaskParam formDTO);
}

103
epmet-user/epmet-user-server/src/main/java/com/epmet/service/impl/DataSyncConfigServiceImpl.java

@ -1,20 +1,27 @@
package com.epmet.service.impl;
import java.util.Date;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.epmet.commons.mybatis.entity.BaseEpmetEntity;
import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.constant.StrConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.dto.form.PageFormDTO;
import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult;
import com.epmet.commons.tools.dto.result.YtDataSyncResDTO;
import com.epmet.commons.tools.dto.result.YtHscyResDTO;
import com.epmet.commons.tools.dto.result.YtHsjcResDTO;
import com.epmet.commons.tools.enums.GenderEnum;
import com.epmet.commons.tools.exception.EpmetErrorCode;
import com.epmet.commons.tools.exception.EpmetException;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.page.PageData;
import com.epmet.commons.tools.redis.common.CustomerOrgRedis;
import com.epmet.commons.tools.redis.common.CustomerStaffRedis;
import com.epmet.commons.tools.redis.common.bean.AgencyInfoCache;
import com.epmet.commons.tools.redis.common.bean.GridInfoCache;
import com.epmet.commons.tools.security.dto.TokenDto;
@ -24,6 +31,7 @@ import com.epmet.commons.tools.utils.YtHsResUtils;
import com.epmet.constant.EpidemicConstant;
import com.epmet.dao.DataSyncConfigDao;
import com.epmet.dao.IcNatDao;
import com.epmet.dao.IcSyncJobDao;
import com.epmet.dto.DataSyncConfigDTO;
import com.epmet.dto.DataSyncRecordDeathDTO;
import com.epmet.dto.DataSyncRecordDisabilityDTO;
@ -40,6 +48,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
@ -50,9 +59,12 @@ import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.epmet.constant.EpidemicConstant.*;
/**
* 数据更新配置表
*
@ -77,6 +89,12 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
private DataSyncRecordDisabilityService dataSyncRecordDisabilityService;
@Autowired
private ExecutorService executorService;
@Autowired
private IcSyncJobService icSyncJobService;
@Autowired
private IcSyncJobDao icSyncJobDao;
@Autowired
private DistributedLock distributedLock;
@Resource(name = "yantaiNamedParamLantuJdbcTemplate")
private NamedParameterJdbcTemplate yantaiNamedParamLantuJdbcTemplate;
@ -332,7 +350,7 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
public void natInfoScanTask(DataSyncTaskParam formDTO) {
List<DataSyncConfigDTO> configData = new ArrayList<>();
if (StringUtils.isBlank(formDTO.getAgencyId())){
configData = getConfigData(null, EpidemicConstant.DATA_CODE_DISABILITY);
configData = getConfigData(null, EpidemicConstant.DATA_CODE_NAT);
if (CollectionUtils.isEmpty(configData)){
log.warn("natInfoScanTask not exists config data,customer is "+formDTO.getCustomerId());
return;
@ -381,6 +399,89 @@ public class DataSyncConfigServiceImpl extends BaseServiceImpl<DataSyncConfigDao
} while (dbResiList != null && dbResiList.size() == pageSize);
}
@Override
public void natInfoSyncButton(DataSyncTaskParam formDTO) {
CustomerStaffInfoCacheResult staffInfo = CustomerStaffRedis.getStaffInfo(formDTO.getCustomerId(), formDTO.getStaffId());
if (null == staffInfo){
throw new EpmetException("未查询到工作人员信息:"+formDTO.getStaffId());
}
AgencyInfoCache agencyInfo = CustomerOrgRedis.getAgencyInfo(formDTO.getAgencyId());
if (null == agencyInfo){
throw new EpmetException("未查询到组织信息:"+formDTO.getAgencyId());
}
LambdaQueryWrapper<IcSyncJobEntity> qw = new LambdaQueryWrapper<>();
qw.eq(IcSyncJobEntity::getOrgId,formDTO.getAgencyId()).in(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING,OPERATION_STATUS_PROCESSING);
List<IcSyncJobEntity> icSyncJobEntities = icSyncJobDao.selectList(qw);
// 当前组织下存在同步任务
if (CollectionUtils.isNotEmpty(icSyncJobEntities)){
throw new EpmetException(EpmetErrorCode.EXIST_SYNC_JOB_ERROR.getCode());
}
// 不存在新增一条记录
IcSyncJobEntity e = new IcSyncJobEntity();
e.setCustomerId(formDTO.getCustomerId());
e.setOrgId(formDTO.getAgencyId());
e.setPid(agencyInfo.getPid());
e.setOrgIdPath(StringUtils.isBlank(agencyInfo.getPids()) ? agencyInfo.getId() : agencyInfo.getPids()+":"+agencyInfo.getId());
e.setJobType(JOB_TYPE_NAT);
e.setOperatorId(formDTO.getStaffId());
e.setOperationStatus(OPERATION_STATUS_WAITING);
insertSync(e);
// 客户下,type=nat 存在两条正在进行中的同步任务
LambdaQueryWrapper<IcSyncJobEntity> qw2 = new LambdaQueryWrapper<>();
qw2.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId())
.eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_PROCESSING)
.eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT);
List<IcSyncJobEntity> processingList = icSyncJobDao.selectList(qw2);
if (CollectionUtils.isNotEmpty(processingList) && processingList.size() >= NumConstant.TWO){
throw new EpmetException(EpmetErrorCode.OTHER_SYNC_JOB_ERROR.getCode());
}
List<IcSyncJobEntity> waitList;
do {
LambdaQueryWrapper<IcSyncJobEntity> qw3 = new LambdaQueryWrapper<>();
qw3.eq(IcSyncJobEntity::getCustomerId,formDTO.getCustomerId())
.eq(IcSyncJobEntity::getOperationStatus,OPERATION_STATUS_WAITING)
.eq(IcSyncJobEntity::getJobType,JOB_TYPE_NAT)
.orderByAsc(BaseEpmetEntity::getCreatedTime);
waitList = icSyncJobDao.selectList(qw3);
if (CollectionUtils.isNotEmpty(waitList)){
for (IcSyncJobEntity entity : waitList) {
RLock lock = null;
try {
lock = distributedLock.getLock(entity.getOrgId() + JOB_TYPE_NAT, 60L, 60L, TimeUnit.SECONDS);
updateSync(entity.getId(),OPERATION_STATUS_PROCESSING);
}catch (Exception ex){
log.error(ex.getMessage());
throw new EpmetException(ex.getMessage());
}finally {
lock.unlock();
}
formDTO.setAgencyId(entity.getOrgId());
try {
natInfoScanTask(formDTO);
}catch (Exception ee){
log.error(ee.getMessage());
throw new EpmetException(ee.getMessage());
}finally {
updateSync(entity.getId(),OPERATION_STATUS_FINISH);
}
}
}
}while (CollectionUtils.isNotEmpty(waitList));
}
@Transactional(rollbackFor = Exception.class)
public void updateSync(String id,String status){
LambdaUpdateWrapper<IcSyncJobEntity> qwUpdate = new LambdaUpdateWrapper<>();
qwUpdate.eq(BaseEpmetEntity::getId,id)
.set(IcSyncJobEntity::getOperationStatus,status)
.set(BaseEpmetEntity::getUpdatedTime, new Date());
icSyncJobDao.update(null,qwUpdate);
}
@Transactional(rollbackFor = Exception.class)
public void insertSync(IcSyncJobEntity e){
icSyncJobService.insert(e);
}
/**
* @Description 配置信息查询
* @param customerId

Loading…
Cancel
Save