diff --git a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java index e1fa2fa2c7..ce57594ad2 100644 --- a/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java +++ b/epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java @@ -314,6 +314,14 @@ public class RedisKeys { * @return */ public static String getCustomerStatsCalFlag(String customerId) { - return String.format(rootPrefix+"stats:calflag:%s", customerId); + return getCustomerStatsCalKeyPrefix().concat(":").concat(customerId); + } + + /** + * 获取计算标记的key前缀 + * @return + */ + public static String getCustomerStatsCalKeyPrefix() { + return rootPrefix.concat("stats:calflag"); } } diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/constant/CalculateStatus.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/constant/CalculateStatus.java new file mode 100644 index 0000000000..f4425ee714 --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/constant/CalculateStatus.java @@ -0,0 +1,13 @@ +package com.epmet.constant; + +/** + * 计算状态 + * calculating:正在计算中。有实例正在计算 + * pendding:计算暂停,说明计算过程中实例发生重启,等待重新计算 + */ +public interface CalculateStatus { + + String PENDDING = "pendding"; + String CALCULATING = "calculating"; + +} diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/IndexCalculateController.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/IndexCalculateController.java index 8459b99f50..4cecf7f168 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/IndexCalculateController.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/controller/IndexCalculateController.java @@ -2,13 +2,16 @@ package com.epmet.controller; import com.epmet.commons.extappauth.annotation.ExternalAppRequestAuth; import com.epmet.commons.extappauth.bean.ExternalAppRequestParam; +import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.redis.RedisKeys; import com.epmet.commons.tools.redis.RedisUtils; import com.epmet.commons.tools.utils.DateUtils; import com.epmet.commons.tools.utils.HttpClientManager; import com.epmet.commons.tools.utils.Result; import com.epmet.commons.tools.validator.ValidatorUtils; +import com.epmet.constant.CalculateStatus; import com.epmet.dto.indexcal.CalculateCommonFormDTO; +import com.epmet.model.CalculateFlagModel; import com.epmet.service.evaluationindex.indexcal.CpcIndexCalculateService; import com.epmet.service.evaluationindex.indexcal.IndexCalculateService; import com.epmet.util.DimIdGenerator; @@ -21,10 +24,14 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.*; /** @@ -59,20 +66,41 @@ public class IndexCalculateController { private Map futureMap = new HashMap<>(); @PreDestroy - public void clearDataCalFlag() { - // 实例销毁之前,将正在本实例中执行计算的客户列表的计算状态清空 + public void saveCalStatus() { + // 实例销毁之前,将正在本实例中执行计算的客户列表的计算状态修改为pendding,等待其他实例重新计算 futureMap.forEach((customerId, future) -> { - redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(customerId)); + CalculateFlagModel flag = (CalculateFlagModel) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(customerId)); + flag.setStatus(CalculateStatus.PENDDING); + redisUtils.set(RedisKeys.getCustomerStatsCalFlag(customerId), flag); + log.info("客户【%s】正在执行计算,实例发生重启,修改计算状态为:calculation->pendding", customerId); }); } + /** + * 处理暂停中的计算 + */ + @PostMapping("process-pendding-cals") + public void processPenddingCalculate() { + String keyPrefix = RedisKeys.getCustomerStatsCalKeyPrefix(); + Set calFlagKeys = redisUtils.keys(keyPrefix.concat("*")); + calFlagKeys.forEach(key -> { + CalculateFlagModel flag = (CalculateFlagModel) redisUtils.get(key); + if (flag != null && CalculateStatus.PENDDING.equals(flag.getStatus())) { + // 找到状态是pendding的key,执行计算 + CalculateCommonFormDTO form = new CalculateCommonFormDTO(); + form.setCustomerId(flag.getForm().getCustomerId()); + form.setMonthId(flag.getForm().getMonthId()); + indexCalculate(form); + } + }); + } /** * 按照客户计算所有指标(按照月份) * * @param formDTO * @return com.epmet.commons.tools.utils.Result - * @Author zhangyong + * @Author zhangyongç * @Date 10:52 2020-08-20 **/ @ExternalAppRequestAuth @@ -80,34 +108,66 @@ public class IndexCalculateController { public Result indexCalculate(ExternalAppRequestParam externalAppRequestParam, @RequestBody CalculateCommonFormDTO formDTO) { String customerId = externalAppRequestParam.getCustomerId(); //String customerId = "epmettest"; - Boolean executing = (Boolean) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(customerId)); - if (executing == null || !executing) { + formDTO.setCustomerId(customerId); + indexCalculate(formDTO); + return new Result().ok(true); + } + + /** + * 指标计算 + * @param formDTO + */ + private void indexCalculate(CalculateCommonFormDTO formDTO) { + CalculateFlagModel executeFlag = (CalculateFlagModel) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId())); + if (executeFlag == null || !CalculateStatus.CALCULATING.equals(executeFlag.getStatus())) { + // 可以计算 synchronized (statsCalLock) { - Boolean executing2 = (Boolean) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(customerId)); - if (executing2 != null && executing2) { - log.error(String.format("该客户正在执行计算,请勿重复提交计算请求。", customerId)); - return new Result().ok(false); + CalculateFlagModel executingFlag2 = (CalculateFlagModel) redisUtils.get(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId())); + if (executingFlag2 != null && CalculateStatus.CALCULATING.equals(executingFlag2.getStatus())) { + //log.error(String.format("客户【%s】正在执行计算,请勿重复提交计算请求。", customerId)); + throw new RenException(String.format("客户【%s】正在执行计算,请勿重复提交计算请求。", formDTO.getCustomerId())); } - Future future = singleThreadPool.submit(() -> { - formDTO.setCustomerId(customerId); - long start = System.currentTimeMillis(); - Boolean aBoolean = indexCalculateService.indexCalculate(formDTO); - if (aBoolean) { - log.error("客户Id:{},全部指标计算完成,总耗时:{}秒", customerId, (System.currentTimeMillis() - start) / 1000); - } - redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(customerId)); - futureMap.remove(customerId); - }); - futureMap.put(customerId, future); - redisUtils.set(RedisKeys.getCustomerStatsCalFlag(customerId), true); + // 提交异步计算 + submitCalculate(formDTO); } } else { - log.error(String.format("该客户正在执行计算,请勿重复提交计算请求。", customerId)); - return new Result().ok(false); + throw new RenException(String.format("客户【%s】正在执行计算,请勿重复提交计算请求。", formDTO.getCustomerId())); } + } + + /** + * 提交异步计算 + * @param formDTO + * @return + */ + private void submitCalculate(CalculateCommonFormDTO formDTO) { + Future future = singleThreadPool.submit(() -> { + long start = System.currentTimeMillis(); + Boolean aBoolean = indexCalculateService.indexCalculate(formDTO); + if (aBoolean) { + log.error("客户Id:{},全部指标计算完成,总耗时:{}秒", formDTO.getCustomerId(), (System.currentTimeMillis() - start) / 1000); + } + redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId())); + futureMap.remove(formDTO.getCustomerId()); - return new Result().ok(true); + //测试代码 + //try { + // Thread.sleep(20000l); + // System.out.println(System.currentTimeMillis()); + //} catch (InterruptedException e) { + // e.printStackTrace(); + //} + //redisUtils.delete(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId())); + //futureMap.remove(formDTO.getCustomerId()); + }); + + futureMap.put(formDTO.getCustomerId(), future); + + CalculateFlagModel flag = new CalculateFlagModel(); + flag.setStatus(CalculateStatus.CALCULATING); + flag.setForm(formDTO); + redisUtils.set(RedisKeys.getCustomerStatsCalFlag(formDTO.getCustomerId()), flag); } /** diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/model/CalculateFlagModel.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/model/CalculateFlagModel.java new file mode 100644 index 0000000000..95d114ae16 --- /dev/null +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/model/CalculateFlagModel.java @@ -0,0 +1,20 @@ +package com.epmet.model; + +import com.epmet.dto.indexcal.CalculateCommonFormDTO; +import lombok.Data; + +@Data +public class CalculateFlagModel { + + /** + * 提交计算时候的参数 + */ + private CalculateCommonFormDTO form; + + /** + * 计算状态 + * CalculateStatus.java + */ + private String status; + +}