Browse Source

新增:数据统计-重新执行所有pendding中的计算

dev_shibei_match
wxz 5 years ago
parent
commit
2595269922
  1. 10
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 13
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/constant/CalculateStatus.java
  3. 104
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/IndexCalculateController.java
  4. 20
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/model/CalculateFlagModel.java

10
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -314,6 +314,14 @@ public class RedisKeys {
* @return
*/
public static String getCustomerStatsCalFlag(String customerId) {
return String.format(rootPrefix+"stats:calflag:%s", customerId);
return getCustomerStatsCalKeyPrefix().concat(":").concat(customerId);
}
/**
* 获取计算标记的key前缀
* @return
*/
public static String getCustomerStatsCalKeyPrefix() {
return rootPrefix.concat("stats:calflag");
}
}

13
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/constant/CalculateStatus.java

@ -0,0 +1,13 @@
package com.epmet.constant;
/**
* 计算状态
* calculating:正在计算中有实例正在计算
* pendding:计算暂停说明计算过程中实例发生重启等待重新计算
*/
public interface CalculateStatus {
String PENDDING = "pendding";
String CALCULATING = "calculating";
}

104
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/IndexCalculateController.java

@ -2,13 +2,16 @@ package com.epmet.controller;
import com.epmet.commons.extappauth.annotation.ExternalAppRequestAuth;
import com.epmet.commons.extappauth.bean.ExternalAppRequestParam;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.DateUtils;
import com.epmet.commons.tools.utils.HttpClientManager;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.validator.ValidatorUtils;
import com.epmet.constant.CalculateStatus;
import com.epmet.dto.indexcal.CalculateCommonFormDTO;
import com.epmet.model.CalculateFlagModel;
import com.epmet.service.evaluationindex.indexcal.CpcIndexCalculateService;
import com.epmet.service.evaluationindex.indexcal.IndexCalculateService;
import com.epmet.util.DimIdGenerator;
@ -21,10 +24,14 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
/**
@ -59,20 +66,41 @@ public class IndexCalculateController {
private Map<String, Future> futureMap = new HashMap<>();
@PreDestroy
public void clearDataCalFlag() {
// 实例销毁之前,将正在本实例中执行计算的客户列表的计算状态清空
public void saveCalStatus() {
// 实例销毁之前,将正在本实例中执行计算的客户列表的计算状态修改为pendding,等待其他实例重新计算
futureMap.forEach((customerId, future) -> {
redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(customerId));
CalculateFlagModel flag = (CalculateFlagModel) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(customerId));
flag.setStatus(CalculateStatus.PENDDING);
redisUtils.set(RedisKeys.getCustomerStatsCalFlag(customerId), flag);
log.info("客户【%s】正在执行计算,实例发生重启,修改计算状态为:calculation->pendding", customerId);
});
}
/**
* 处理暂停中的计算
*/
@PostMapping("process-pendding-cals")
public void processPenddingCalculate() {
String keyPrefix = RedisKeys.getCustomerStatsCalKeyPrefix();
Set<String> calFlagKeys = redisUtils.keys(keyPrefix.concat("*"));
calFlagKeys.forEach(key -> {
CalculateFlagModel flag = (CalculateFlagModel) redisUtils.get(key);
if (flag != null && CalculateStatus.PENDDING.equals(flag.getStatus())) {
// 找到状态是pendding的key,执行计算
CalculateCommonFormDTO form = new CalculateCommonFormDTO();
form.setCustomerId(flag.getForm().getCustomerId());
form.setMonthId(flag.getForm().getMonthId());
indexCalculate(form);
}
});
}
/**
* 按照客户计算所有指标(按照月份)
*
* @param formDTO
* @return com.epmet.commons.tools.utils.Result
* @Author zhangyong
* @Author zhangyongç
* @Date 10:52 2020-08-20
**/
@ExternalAppRequestAuth
@ -80,34 +108,66 @@ public class IndexCalculateController {
public Result<Boolean> indexCalculate(ExternalAppRequestParam externalAppRequestParam, @RequestBody CalculateCommonFormDTO formDTO) {
String customerId = externalAppRequestParam.getCustomerId();
//String customerId = "epmettest";
Boolean executing = (Boolean) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(customerId));
if (executing == null || !executing) {
formDTO.setCustomerId(customerId);
indexCalculate(formDTO);
return new Result<Boolean>().ok(true);
}
/**
* 指标计算
* @param formDTO
*/
private void indexCalculate(CalculateCommonFormDTO formDTO) {
CalculateFlagModel executeFlag = (CalculateFlagModel) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId()));
if (executeFlag == null || !CalculateStatus.CALCULATING.equals(executeFlag.getStatus())) {
// 可以计算
synchronized (statsCalLock) {
Boolean executing2 = (Boolean) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(customerId));
if (executing2 != null && executing2) {
log.error(String.format("该客户正在执行计算,请勿重复提交计算请求。", customerId));
return new Result<Boolean>().ok(false);
CalculateFlagModel executingFlag2 = (CalculateFlagModel) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId()));
if (executingFlag2 != null && CalculateStatus.CALCULATING.equals(executingFlag2.getStatus())) {
//log.error(String.format("客户【%s】正在执行计算,请勿重复提交计算请求。", customerId));
throw new RenException(String.format("客户【%s】正在执行计算,请勿重复提交计算请求。", formDTO.getCustomerId()));
}
// 提交异步计算
submitCalculate(formDTO);
}
} else {
throw new RenException(String.format("客户【%s】正在执行计算,请勿重复提交计算请求。", formDTO.getCustomerId()));
}
}
/**
* 提交异步计算
* @param formDTO
* @return
*/
private void submitCalculate(CalculateCommonFormDTO formDTO) {
Future<?> future = singleThreadPool.submit(() -> {
formDTO.setCustomerId(customerId);
long start = System.currentTimeMillis();
Boolean aBoolean = indexCalculateService.indexCalculate(formDTO);
if (aBoolean) {
log.error("客户Id:{},全部指标计算完成,总耗时:{}秒", customerId, (System.currentTimeMillis() - start) / 1000);
log.error("客户Id:{},全部指标计算完成,总耗时:{}秒", formDTO.getCustomerId(), (System.currentTimeMillis() - start) / 1000);
}
redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(customerId));
futureMap.remove(customerId);
redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId()));
futureMap.remove(formDTO.getCustomerId());
//测试代码
//try {
// Thread.sleep(20000l);
// System.out.println(System.currentTimeMillis());
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
//redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId()));
//futureMap.remove(formDTO.getCustomerId());
});
futureMap.put(customerId, future);
redisUtils.set(RedisKeys.getCustomerStatsCalFlag(customerId), true);
}
} else {
log.error(String.format("该客户正在执行计算,请勿重复提交计算请求。", customerId));
return new Result<Boolean>().ok(false);
}
futureMap.put(formDTO.getCustomerId(), future);
return new Result<Boolean>().ok(true);
CalculateFlagModel flag = new CalculateFlagModel();
flag.setStatus(CalculateStatus.CALCULATING);
flag.setForm(formDTO);
redisUtils.set(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId()), flag);
}
/**

20
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/model/CalculateFlagModel.java

@ -0,0 +1,20 @@
package com.epmet.model;
import com.epmet.dto.indexcal.CalculateCommonFormDTO;
import lombok.Data;
@Data
public class CalculateFlagModel {
/**
* 提交计算时候的参数
*/
private CalculateCommonFormDTO form;
/**
* 计算状态
* CalculateStatus.java
*/
private String status;
}
Loading…
Cancel
Save