Browse Source

screenDaily统计加锁

dev_shibei_match
zxc 4 years ago
parent
commit
2b32df2253
  1. 2
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/constant/NumConstant.java
  2. 2
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/distributedlock/LockConstants.java
  3. 269
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

2
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/constant/NumConstant.java

@ -64,6 +64,8 @@ public interface NumConstant {
long THREE_L = 3L; long THREE_L = 3L;
long FOUR_L = 4L; long FOUR_L = 4L;
long MINUS_ONE_L = -1L; long MINUS_ONE_L = -1L;
long TEN_L = 10L;
long SIX_HUNDRED_L = 600L;
String ZERO_STR = "0"; String ZERO_STR = "0";
String ONE_STR = "1"; String ONE_STR = "1";

2
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/distributedlock/LockConstants.java

@ -9,4 +9,6 @@ public interface LockConstants {
String TEST_LOCK_NAME = "testLock"; String TEST_LOCK_NAME = "testLock";
String STATS_LOCK_NAME = "stats"; String STATS_LOCK_NAME = "stats";
String SCREEN_DAILY = "screenDaily";
} }

269
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

@ -2,6 +2,8 @@ package com.epmet.service.evaluationindex.extract.toscreen.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.distributedlock.LockConstants;
import com.epmet.commons.tools.enums.EnvEnum; import com.epmet.commons.tools.enums.EnvEnum;
import com.epmet.commons.tools.utils.DateUtils; import com.epmet.commons.tools.utils.DateUtils;
import com.epmet.commons.tools.utils.HttpClientManager; import com.epmet.commons.tools.utils.HttpClientManager;
@ -20,6 +22,7 @@ import com.epmet.service.stats.DimCustomerService;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -86,6 +89,8 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
private FactGridMemberStatisticsDailyService factGridMemberStatisticsDailyService; private FactGridMemberStatisticsDailyService factGridMemberStatisticsDailyService;
@Autowired @Autowired
private ScreenProjectCategoryGridAndOrgDailyService screenProjectCategoryGridAndOrgDailyService; private ScreenProjectCategoryGridAndOrgDailyService screenProjectCategoryGridAndOrgDailyService;
@Autowired
private DistributedLock distributedLock;
/** /**
* @param extractOriginFormDTO * @param extractOriginFormDTO
@ -163,155 +168,163 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
* @date 2020/9/24 10:16 上午 * @date 2020/9/24 10:16 上午
*/ */
public void extractDaily(String customerId, String dateId, boolean isLast) { public void extractDaily(String customerId, String dateId, boolean isLast) {
//等待3个线程执行完毕后再 继续执行下一个客户的 避免死锁 RLock lock = null;
final CountDownLatch latch = new CountDownLatch(NumConstant.FOUR); try {
threadPool.submit(() -> { // 锁持有10分钟,等待10s
//党员基本情况screen_cpc_base_data lock = distributedLock.getLock(LockConstants.SCREEN_DAILY, NumConstant.SIX_HUNDRED_L, NumConstant.TEN_L, TimeUnit.SECONDS);
try { //等待3个线程执行完毕后再 继续执行下一个客户的 避免死锁
partyBaseInfoService.statsPartyMemberBaseInfoToScreen(customerId, dateId); final CountDownLatch latch = new CountDownLatch(NumConstant.FOUR);
} catch (Exception e) { threadPool.submit(() -> {
log.error("党员基本情况抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); //党员基本情况screen_cpc_base_data
}
//先锋模范screen_pioneer_data
if (isLast) {
try { try {
pioneerDataExtractService.extractGridPioneerData(customerId, dateId); partyBaseInfoService.statsPartyMemberBaseInfoToScreen(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("先锋模范【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("党员基本情况抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
//先锋模范screen_pioneer_data
if (isLast) {
try {
pioneerDataExtractService.extractGridPioneerData(customerId, dateId);
} catch (Exception e) {
log.error("先锋模范【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
pioneerDataExtractService.extractCommunityPioneerData(customerId, dateId);
} catch (Exception e) {
log.error("先锋模范【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
pioneerDataExtractService.extractExceptCommunityPioneerData(customerId, dateId);
} catch (Exception e) {
log.error("先锋模范【extractExceptCommunityPioneerData】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
} }
latch.countDown();
log.info("extractDaily 1 thread run end ========= dateId:{},customerId:{}", dateId, customerId);
});
threadPool.submit(() -> {
//公众参与排行(注册人数、参与人数、话题数、议题数、项目数)screen_public_party_total_data
try { try {
pioneerDataExtractService.extractCommunityPioneerData(customerId, dateId); publicPartiTotalDataExtractService.extractPublicPartiTotalData(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("先锋模范【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("公众参与排行抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
if (isLast) {
//基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
try {
ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
param2.setCustomerId(customerId);
param2.setDateId(null);
log.info("【难点赌点数据上报开始------】 当前客户Id{}", param2.getCustomerId());
//screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
log.info("【难点赌点数据上报结束------】 当前客户Id{}", param2.getCustomerId());
} catch (Exception e) {
log.error("基层治理-难点赌点抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
} }
latch.countDown();
log.info("extractDaily 2 thread run end ========= dateId:{},customerId:{}", dateId, customerId);
});
threadPool.submit(() -> {
ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO();
param.setCustomerId(customerId);
param.setDateId(dateId);
//中央区 screen_user_total_data
try { try {
pioneerDataExtractService.extractExceptCommunityPioneerData(customerId, dateId); screenCentralZoneDataAbsorptionService.centralZoneDataHub(param);
} catch (Exception e) { } catch (Exception e) {
log.error("先锋模范【extractExceptCommunityPioneerData】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("中央区抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
}
latch.countDown();
log.info("extractDaily 1 thread run end ========= dateId:{},customerId:{}", dateId, customerId);
});
threadPool.submit(() -> {
//公众参与排行(注册人数、参与人数、话题数、议题数、项目数)screen_public_party_total_data
try {
publicPartiTotalDataExtractService.extractPublicPartiTotalData(customerId, dateId);
} catch (Exception e) {
log.error("公众参与排行抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
if (isLast) {
//基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
try { try {
ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO(); // 项目(事件)分析按网格_按天统计
param2.setCustomerId(customerId); screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);
param2.setDateId(null);
log.info("【难点赌点数据上报开始------】 当前客户Id{}", param2.getCustomerId());
//screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
log.info("【难点赌点数据上报结束------】 当前客户Id{}", param2.getCustomerId());
} catch (Exception e) { } catch (Exception e) {
log.error("基层治理-难点赌点抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("项目(事件)分析按网格_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
// 项目(事件)分析按组织_按天统计
screenProjectOrgDailyService.extractionProjectOrgDaily(customerId, dateId);
} catch (Exception e) {
log.error("项目(事件)分析按组织_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
}
latch.countDown();
log.info("extractDaily 2 thread run end ========= dateId:{},customerId:{}", dateId, customerId);
});
threadPool.submit(() -> {
ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO();
param.setCustomerId(customerId);
param.setDateId(dateId);
//中央区 screen_user_total_data
try {
screenCentralZoneDataAbsorptionService.centralZoneDataHub(param);
} catch (Exception e) {
log.error("中央区抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
// 项目(事件)分析按网格_按天统计
screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);
} catch (Exception e) {
log.error("项目(事件)分析按网格_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
// 项目(事件)分析按组织_按天统计
screenProjectOrgDailyService.extractionProjectOrgDaily(customerId, dateId);
} catch (Exception e) {
log.error("项目(事件)分析按组织_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
//按天统计:网格内各个分类下的项目总数 //按天统计:网格内各个分类下的项目总数
try { try {
screenProjectCategoryGridAndOrgDailyService.extractCategoryProjectGridData(customerId, dateId); screenProjectCategoryGridAndOrgDailyService.extractCategoryProjectGridData(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("按天统计:网格内各个分类下的项目总数,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("按天统计:网格内各个分类下的项目总数,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
// 按天统计:组织内各个分类下的项目总数 // 按天统计:组织内各个分类下的项目总数
try { try {
screenProjectCategoryGridAndOrgDailyService.extractCategoryProjectOrgData(customerId, dateId); screenProjectCategoryGridAndOrgDailyService.extractCategoryProjectOrgData(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("按天统计:组织内各个分类下的项目总数,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("按天统计:组织内各个分类下的项目总数,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
latch.countDown(); latch.countDown();
log.info("extractDaily 3 thread run end ========= dateId:{},customerId:{}", dateId, customerId); log.info("extractDaily 3 thread run end ========= dateId:{},customerId:{}", dateId, customerId);
}); });
threadPool.submit(() -> { threadPool.submit(() -> {
//治理能力排行screen_govern_rank_data //治理能力排行screen_govern_rank_data
try { try {
governRankDataExtractService.extractGridDataDaily(customerId, dateId); governRankDataExtractService.extractGridDataDaily(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("治理能力排行【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("治理能力排行【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
try { try {
governRankDataExtractService.extractCommunityDataDaily(customerId, dateId); governRankDataExtractService.extractCommunityDataDaily(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("治理能力排行【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("治理能力排行【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
try { try {
governRankDataExtractService.extractStreetDataDaily(customerId, dateId); governRankDataExtractService.extractStreetDataDaily(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("治理能力排行【街道】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("治理能力排行【街道】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
try { try {
governRankDataExtractService.extractDistrictDataDaily(customerId, dateId); governRankDataExtractService.extractDistrictDataDaily(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("治理能力排行【全区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("治理能力排行【全区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
try { try {
ExtractFactGridGovernDailyFromDTO extractFactGridGovernDailyFromDTO = new ExtractFactGridGovernDailyFromDTO(); ExtractFactGridGovernDailyFromDTO extractFactGridGovernDailyFromDTO = new ExtractFactGridGovernDailyFromDTO();
extractFactGridGovernDailyFromDTO.setCustomerId(customerId); extractFactGridGovernDailyFromDTO.setCustomerId(customerId);
extractFactGridGovernDailyFromDTO.setDateId(dateId); extractFactGridGovernDailyFromDTO.setDateId(dateId);
factGridGovernDailyService.extractFactGridGovernDaily(extractFactGridGovernDailyFromDTO); factGridGovernDailyService.extractFactGridGovernDaily(extractFactGridGovernDailyFromDTO);
} catch (Exception e) { } catch (Exception e) {
log.error("治理指数-网格fact_grid_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("治理指数-网格fact_grid_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
try { try {
factAgencyGovernDailyService.extractFactAgencyGovernDaily(customerId, dateId); factAgencyGovernDailyService.extractFactAgencyGovernDaily(customerId, dateId);
} catch (Exception e) { } catch (Exception e) {
log.error("治理指数-组织fact_agency_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("治理指数-组织fact_agency_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
} }
try {
factGridMemberStatisticsDailyService.extractGridMemberStatisticsDaily(customerId, dateId);
} catch (Exception e) {
log.error("网格员数据统计fact_grid_member_statistics_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
extractPartData(customerId, dateId);
latch.countDown();
log.info("extractDaily 4 thread run end ========= dateId:{},customerId:{}", dateId, customerId);
});
try { try {
factGridMemberStatisticsDailyService.extractGridMemberStatisticsDaily(customerId, dateId); latch.await();
} catch (Exception e) { } catch (InterruptedException e) {
log.error("网格员数据统计fact_grid_member_statistics_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e); log.error("extractDaily run exception", e);
} }
extractPartData(customerId, dateId); log.info("===== extractDaily method end customerId:{}======",customerId);
latch.countDown(); }finally {
log.info("extractDaily 4 thread run end ========= dateId:{},customerId:{}", dateId, customerId); lock.unlock();
});
try {
latch.await();
} catch (InterruptedException e) {
log.error("extractDaily run exception", e);
} }
log.info("===== extractDaily method end customerId:{}======",customerId);
} }
@Override @Override

Loading…
Cancel
Save