forked from luyan/epmet-cloud-lingshan
				
			
				 9 changed files with 365 additions and 57 deletions
			
			
		@ -0,0 +1,118 @@ | 
				
			|||||
 | 
					package com.epmet.processor; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | 
				
			||||
 | 
					import com.epmet.commons.tools.distributedlock.DistributedLock; | 
				
			||||
 | 
					import com.epmet.commons.tools.exception.ExceptionUtils; | 
				
			||||
 | 
					import com.epmet.commons.tools.redis.RedisUtils; | 
				
			||||
 | 
					import com.epmet.constant.EpidemicConstant; | 
				
			||||
 | 
					import com.epmet.dao.IcSyncJobDao; | 
				
			||||
 | 
					import com.epmet.entity.IcSyncJobEntity; | 
				
			||||
 | 
					import com.epmet.service.DataSyncConfigService; | 
				
			||||
 | 
					import com.epmet.service.IcResiComparisonRecordService; | 
				
			||||
 | 
					import lombok.extern.slf4j.Slf4j; | 
				
			||||
 | 
					import org.redisson.api.RLock; | 
				
			||||
 | 
					import org.springframework.beans.factory.annotation.Autowired; | 
				
			||||
 | 
					import org.springframework.scheduling.annotation.Scheduled; | 
				
			||||
 | 
					import org.springframework.stereotype.Component; | 
				
			||||
 | 
					import org.springframework.util.CollectionUtils; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import java.util.List; | 
				
			||||
 | 
					import java.util.concurrent.ExecutorService; | 
				
			||||
 | 
					import java.util.concurrent.TimeUnit; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import static com.epmet.constant.EpidemicConstant.JOB_TYPE_NAT; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					// 烟台数据比对
 | 
				
			||||
 | 
					@Component | 
				
			||||
 | 
					@Slf4j | 
				
			||||
 | 
					public class YanTaiComparisonSyncProcessor { | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    public static final int MAX_EXECUTING_COUNT = 3; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    private ExecutorService executorService; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    private IcSyncJobDao icSyncJobDao; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    private IcResiComparisonRecordService icResiComparisonRecordService; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    private DistributedLock distributedLock; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    RedisUtils redisUtils; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * 定时扫描和执行同步任务 | 
				
			||||
 | 
					     * 10s扫一次库 | 
				
			||||
 | 
					     */ | 
				
			||||
 | 
					    @Scheduled(cron = "0/10 * * * * ? ") | 
				
			||||
 | 
					    public void scanJobs() { | 
				
			||||
 | 
					        //log.info("【异步数据更新】开始同步任务");
 | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        LambdaQueryWrapper<IcSyncJobEntity> executingListQuery = new LambdaQueryWrapper<>(); | 
				
			||||
 | 
					        executingListQuery.eq(IcSyncJobEntity::getOperationStatus, EpidemicConstant.OPERATION_STATUS_PROCESSING); | 
				
			||||
 | 
					        List<IcSyncJobEntity> executingJobList = icSyncJobDao.selectList(executingListQuery); | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        if (!CollectionUtils.isEmpty(executingJobList) && executingJobList.size() >= MAX_EXECUTING_COUNT) { | 
				
			||||
 | 
					            // 最多只允许同时3条线程运行
 | 
				
			||||
 | 
					            return; | 
				
			||||
 | 
					        } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        int executingCount = executingJobList.size(); | 
				
			||||
 | 
					        // 还可以运行几条线程
 | 
				
			||||
 | 
					        int leftCount = MAX_EXECUTING_COUNT - executingCount; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        RLock lock = null; | 
				
			||||
 | 
					        try { | 
				
			||||
 | 
					            lock = distributedLock.getLock("data:sync:" + "comparison", 60L, 60L, TimeUnit.SECONDS); | 
				
			||||
 | 
					            // 查询可执行的任务列表,并且异步执行
 | 
				
			||||
 | 
					            List<IcSyncJobEntity> icSyncJobToExec = icSyncJobDao.selectExecutableJobList( | 
				
			||||
 | 
					                    EpidemicConstant.JOB_TYPE_COMPARISON, | 
				
			||||
 | 
					                    leftCount); | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					            if (!CollectionUtils.isEmpty(icSyncJobToExec)) { | 
				
			||||
 | 
					                // 异步提交任务
 | 
				
			||||
 | 
					                for (IcSyncJobEntity jobEntity : icSyncJobToExec) { | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					                    updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_PROCESSING); | 
				
			||||
 | 
					                    executorService.submit(() -> { | 
				
			||||
 | 
					                        // 将此任务状态修改为执行中
 | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					                        try { | 
				
			||||
 | 
					                            icResiComparisonRecordService.comparisonUserData(jobEntity); | 
				
			||||
 | 
					                        } finally { | 
				
			||||
 | 
					                            // 更新任务状态为结束
 | 
				
			||||
 | 
					                            updateJobStatus(jobEntity.getId(), EpidemicConstant.OPERATION_STATUS_FINISH); | 
				
			||||
 | 
					                        } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					                    }); | 
				
			||||
 | 
					                } | 
				
			||||
 | 
					            } | 
				
			||||
 | 
					        } catch (Exception e) { | 
				
			||||
 | 
					            log.error("【异步数据更新】出错:{}", ExceptionUtils.getErrorStackTrace(e)); | 
				
			||||
 | 
					        } finally { | 
				
			||||
 | 
					            if (lock != null) { | 
				
			||||
 | 
					                lock.unlock(); | 
				
			||||
 | 
					            } | 
				
			||||
 | 
					        } | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /**  | 
				
			||||
 | 
					     * 更新任务状态 | 
				
			||||
 | 
					     * @param id | 
				
			||||
 | 
					 * @param status  | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					     */ | 
				
			||||
 | 
					    private void updateJobStatus(String id, String status) { | 
				
			||||
 | 
					        LambdaQueryWrapper<IcSyncJobEntity> query = new LambdaQueryWrapper<>(); | 
				
			||||
 | 
					        query.eq(IcSyncJobEntity::getId, id); | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        IcSyncJobEntity updateEntity = new IcSyncJobEntity(); | 
				
			||||
 | 
					        updateEntity.setOperationStatus(status); | 
				
			||||
 | 
					        icSyncJobDao.update(updateEntity, query); | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					} | 
				
			||||
@ -0,0 +1,21 @@ | 
				
			|||||
 | 
					CREATE TABLE `ic_resi_comparison_record` ( | 
				
			||||
 | 
					  `ID` varchar(64) NOT NULL COMMENT 'ID', | 
				
			||||
 | 
					  `CUSTOMER_ID` varchar(64) NOT NULL COMMENT '客户Id', | 
				
			||||
 | 
					  `RESI_ID` varchar(64) NOT NULL COMMENT 'ic_resi_user.id【比对时的数据值】', | 
				
			||||
 | 
					  `RESI_NAME` varchar(64) NOT NULL COMMENT '数字社区人员姓名【比对时的数据值】', | 
				
			||||
 | 
					  `RESI_ID_CARD` varchar(18) NOT NULL COMMENT '数据社区身份证号【比对时的数据值】', | 
				
			||||
 | 
					  `POLICE_NAME` varchar(64) DEFAULT NULL COMMENT '公安部人员姓名', | 
				
			||||
 | 
					  `POLICE_ID_CARD` varchar(18) DEFAULT NULL COMMENT '公安部人员证件号', | 
				
			||||
 | 
					  `TYPE` varchar(1) NOT NULL DEFAULT '0' COMMENT '数据状态(0:未处理  1:已处理)', | 
				
			||||
 | 
					  `IDENTICAL` varchar(1) NOT NULL DEFAULT '0' COMMENT '信息一致性(0:否 1:是 2:-)2代表程序比对了没有匹配上', | 
				
			||||
 | 
					  `IS_COMPARISON` varchar(1) DEFAULT '0' COMMENT '是否比对过(0:否 1:是)', | 
				
			||||
 | 
					  `COMPARISON_RESULT` varchar(255) DEFAULT NULL COMMENT '比对结果说明', | 
				
			||||
 | 
					  `DEL_FLAG` int(11) NOT NULL COMMENT '删除标识', | 
				
			||||
 | 
					  `REVISION` int(11) NOT NULL COMMENT '乐观锁', | 
				
			||||
 | 
					  `CREATED_BY` varchar(32) NOT NULL COMMENT '创建人', | 
				
			||||
 | 
					  `CREATED_TIME` datetime NOT NULL COMMENT '创建时间', | 
				
			||||
 | 
					  `UPDATED_BY` varchar(32) NOT NULL COMMENT '更新人', | 
				
			||||
 | 
					  `UPDATED_TIME` datetime NOT NULL COMMENT '更新时间', | 
				
			||||
 | 
					  PRIMARY KEY (`ID`), | 
				
			||||
 | 
					  UNIQUE KEY `resi_id` (`RESI_ID`) USING BTREE | 
				
			||||
 | 
					) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT COMMENT='居民信息与公安部信息比对记录表'; | 
				
			||||
					Loading…
					
					
				
		Reference in new issue