Browse Source

测试下 使用多线程

dev_shibei_match
jianjun 4 years ago
parent
commit
d043854c85
  1. 311
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

311
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

@ -17,6 +17,7 @@ import com.epmet.service.evaluationindex.extract.toscreen.*;
import com.epmet.service.evaluationindex.indexcal.IndexCalculateService;
import com.epmet.service.evaluationindex.screen.*;
import com.epmet.service.stats.DimCustomerService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -26,8 +27,7 @@ import org.springframework.util.CollectionUtils;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
/**
* @Author zxc
@ -36,6 +36,11 @@ import java.util.concurrent.Executors;
@Service
@Slf4j
public class ScreenExtractServiceImpl implements ScreenExtractService {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("ScreenExtractServiceImpl-pool-%d").build();
ExecutorService threadPool = new ThreadPoolExecutor(3, 6,
10L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(500), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired
private DimCustomerService dimCustomerService;
@ -135,7 +140,7 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
} else if (StringUtils.isNotBlank(formDTO.getMonthId())) {
extractMonthly(customerId, formDTO.getMonthId());
} else {
String monthId = LocalDate.now().minusMonths(NumConstant.ONE).toString().replace("-", "").substring(NumConstant.ZERO,NumConstant.SIX);
String monthId = LocalDate.now().minusMonths(NumConstant.ONE).toString().replace("-", "").substring(NumConstant.ZERO, NumConstant.SIX);
extractMonthly(customerId, monthId);
}
});
@ -151,129 +156,147 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
* @date 2020/9/24 10:16 上午
*/
public void extractDaily(String customerId, String dateId) {
//党员基本情况screen_cpc_base_data
try {
partyBaseInfoService.statsPartyMemberBaseInfoToScreen(customerId,dateId);
}catch (Exception e){
log.error("党员基本情况抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//先锋模范screen_pioneer_data
try {
pioneerDataExtractService.extractGridPioneerData(customerId, dateId);
}catch (Exception e){
log.error("先锋模范【网格】抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
try {
pioneerDataExtractService.extractCommunityPioneerData(customerId, dateId);
}catch (Exception e){
log.error("先锋模范【社区】抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
try {
pioneerDataExtractService.extractExceptCommunityPioneerData(customerId, dateId);
}catch (Exception e){
log.error("先锋模范【extractExceptCommunityPioneerData】抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//公众参与排行(注册人数、参与人数、话题数、议题数、项目数)screen_public_party_total_data
try {
publicPartiTotalDataExtractService.extractPublicPartiTotalData(customerId,dateId);
}catch (Exception e){
log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
try {
ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
param2.setCustomerId(customerId);
param2.setDateId(null);
log.info("【难点赌点数据上报开始------】 当前客户Id{}",param2.getCustomerId());
//screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
//等待3个线程执行完毕后再 继续执行下一个客户的 避免死锁
final CountDownLatch latch = new CountDownLatch(NumConstant.THREE);
threadPool.submit(() -> {
//党员基本情况screen_cpc_base_data
try {
partyBaseInfoService.statsPartyMemberBaseInfoToScreen(customerId, dateId);
} catch (Exception e) {
log.error("党员基本情况抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
//先锋模范screen_pioneer_data
try {
pioneerDataExtractService.extractGridPioneerData(customerId, dateId);
} catch (Exception e) {
log.error("先锋模范【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
pioneerDataExtractService.extractCommunityPioneerData(customerId, dateId);
} catch (Exception e) {
log.error("先锋模范【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
pioneerDataExtractService.extractExceptCommunityPioneerData(customerId, dateId);
} catch (Exception e) {
log.error("先锋模范【extractExceptCommunityPioneerData】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
latch.countDown();
log.info("extractDaily 1 thread run end ========= dateId:{},customerId:{}",dateId,customerId);
})
threadPool.submit(() -> {
//公众参与排行(注册人数、参与人数、话题数、议题数、项目数)screen_public_party_total_data
try {
publicPartiTotalDataExtractService.extractPublicPartiTotalData(customerId, dateId);
} catch (Exception e) {
log.error("公众参与排行抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
log.info("【难点赌点数据上报结束------】 当前客户Id{}",param2.getCustomerId());
}catch (Exception e){
log.error("基层治理-难点赌点抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//基层治理- 难点赌点 screen_difficulty_data | screen_difficulty_img_data
try {
ScreenCentralZoneDataFormDTO param2 = new ScreenCentralZoneDataFormDTO();
param2.setCustomerId(customerId);
param2.setDateId(null);
log.info("【难点赌点数据上报开始------】 当前客户Id{}", param2.getCustomerId());
//screenGrassrootsGovernDataAbsorptionService.difficultyDataHub(param);
ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO();
param.setCustomerId(customerId);
param.setDateId(dateId);
//中央区 screen_user_total_data
try {
screenCentralZoneDataAbsorptionService.centralZoneDataHub(param);
}catch (Exception e){
log.error("中央区抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
screenGrassrootsGovernDataAbsorptionService.difficultyDataExtract(param2);
log.info("【难点赌点数据上报结束------】 当前客户Id{}", param2.getCustomerId());
} catch (Exception e) {
log.error("基层治理-难点赌点抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
latch.countDown();
log.info("extractDaily 2 thread run end ========= dateId:{},customerId:{}",dateId,customerId);
});
threadPool.submit(() -> {
ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO();
param.setCustomerId(customerId);
param.setDateId(dateId);
//中央区 screen_user_total_data
try {
screenCentralZoneDataAbsorptionService.centralZoneDataHub(param);
} catch (Exception e) {
log.error("中央区抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
// 项目(事件)分析按网格_按天统计
screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);
}catch (Exception e){
log.error("项目(事件)分析按网格_按天统计失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
try {
// 项目(事件)分析按组织_按天统计
screenProjectOrgDailyService.extractionProjectOrgDaily(customerId, dateId);
}catch (Exception e){
log.error("项目(事件)分析按组织_按天统计失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
try {
// 项目(事件)分析按网格_按天统计
screenProjectGridDailyService.extractionProjectGridDaily(customerId, dateId);
} catch (Exception e) {
log.error("项目(事件)分析按网格_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
// 项目(事件)分析按组织_按天统计
screenProjectOrgDailyService.extractionProjectOrgDaily(customerId, dateId);
} catch (Exception e) {
log.error("项目(事件)分析按组织_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
//按天统计:网格内各个分类下的项目总数
try{
projectCategoryGridDailyService.extractProjectCategoryData(customerId,dateId);
}catch(Exception e){
log.error("按天统计:网格内各个分类下的项目总数,customerId为:"+customerId+"dateId为:"+dateId, e);
}
//按天统计:网格内各个分类下的项目总数
try {
projectCategoryGridDailyService.extractProjectCategoryData(customerId, dateId);
} catch (Exception e) {
log.error("按天统计:网格内各个分类下的项目总数,customerId为:" + customerId + "dateId为:" + dateId, e);
}
// 按天统计:组织内各个分类下的项目总数
try{
projectCategoryOrgDailyService.extractProjectCategoryOrgData(customerId,dateId);
}catch(Exception e){
log.error("按天统计:组织内各个分类下的项目总数,customerId为:"+customerId+"dateId为:"+dateId, e);
}
// 按天统计:组织内各个分类下的项目总数
try {
projectCategoryOrgDailyService.extractProjectCategoryOrgData(customerId, dateId);
} catch (Exception e) {
log.error("按天统计:组织内各个分类下的项目总数,customerId为:" + customerId + "dateId为:" + dateId, e);
}
//治理能力排行screen_govern_rank_data
try {
governRankDataExtractService.extractGridDataDaily(customerId, dateId);
}catch (Exception e){
log.error("治理能力排行【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
governRankDataExtractService.extractCommunityDataDaily(customerId, dateId);
}catch (Exception e){
log.error("治理能力排行【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
governRankDataExtractService.extractStreetDataDaily(customerId, dateId);
}catch (Exception e){
log.error("治理能力排行【街道】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
governRankDataExtractService.extractDistrictDataDaily(customerId, dateId);
}catch (Exception e){
log.error("治理能力排行【全区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
//治理能力排行screen_govern_rank_data
try {
governRankDataExtractService.extractGridDataDaily(customerId, dateId);
} catch (Exception e) {
log.error("治理能力排行【网格】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
governRankDataExtractService.extractCommunityDataDaily(customerId, dateId);
} catch (Exception e) {
log.error("治理能力排行【社区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
governRankDataExtractService.extractStreetDataDaily(customerId, dateId);
} catch (Exception e) {
log.error("治理能力排行【街道】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
governRankDataExtractService.extractDistrictDataDaily(customerId, dateId);
} catch (Exception e) {
log.error("治理能力排行【全区】抽取到大屏失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try{
ExtractFactGridGovernDailyFromDTO extractFactGridGovernDailyFromDTO=new ExtractFactGridGovernDailyFromDTO();
extractFactGridGovernDailyFromDTO.setCustomerId(customerId);
extractFactGridGovernDailyFromDTO.setDateId(dateId);
factGridGovernDailyService.extractFactGridGovernDaily(extractFactGridGovernDailyFromDTO);
}catch(Exception e){
log.error("治理指数-网格fact_grid_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
ExtractFactGridGovernDailyFromDTO extractFactGridGovernDailyFromDTO = new ExtractFactGridGovernDailyFromDTO();
extractFactGridGovernDailyFromDTO.setCustomerId(customerId);
extractFactGridGovernDailyFromDTO.setDateId(dateId);
factGridGovernDailyService.extractFactGridGovernDaily(extractFactGridGovernDailyFromDTO);
} catch (Exception e) {
log.error("治理指数-网格fact_grid_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try{
factAgencyGovernDailyService.extractFactAgencyGovernDaily(customerId, dateId);
}catch(Exception e){
log.error("治理指数-组织fact_agency_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try {
factAgencyGovernDailyService.extractFactAgencyGovernDaily(customerId, dateId);
} catch (Exception e) {
log.error("治理指数-组织fact_agency_govern_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
try{
factGridMemberStatisticsDailyService.extractGridMemberStatisticsDaily(customerId, dateId);
}catch(Exception e){
log.error("网格员数据统计fact_grid_member_statistics_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
try {
factGridMemberStatisticsDailyService.extractGridMemberStatisticsDaily(customerId, dateId);
} catch (Exception e) {
log.error("网格员数据统计fact_grid_member_statistics_daily抽取失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
extractPartData(customerId, dateId);
latch.countDown();
log.info("extractDaily 3 thread run end ========= dateId:{},customerId:{}",dateId,customerId);
});
try {
latch.await();
} catch (InterruptedException e) {
log.error("extractDaily run exception", e);
}
extractPartData(customerId, dateId);
log.info("===== extractDaily method end ======");
}
@ -283,11 +306,11 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
param.setCustomerId(customerId);
param.setDateId(dateId);
try{
try {
//大屏项目数据抽取_按天抽取
screenProjectSettleService.extractScreenData(param);
}catch (Exception e){
log.error("大屏项目数据抽取_按天抽取_按天统计失败,customerId为:"+customerId+"dateId为:"+dateId, e);
} catch (Exception e) {
log.error("大屏项目数据抽取_按天抽取_按天统计失败,customerId为:" + customerId + "dateId为:" + dateId, e);
}
}
@ -304,56 +327,56 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
//基层治理-公众参与screen_user_join
try {
publicPartExtractService.extractTotalDataMonthly(formDTO);
}catch (Exception e){
log.error("基层治理-公众参与抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("基层治理-公众参与抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
//治理能力排行screen_govern_rank_data
try {
governRankDataExtractService.extractGridData(customerId, monthId);
}catch (Exception e){
log.error("治理能力排行【网格】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("治理能力排行【网格】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
governRankDataExtractService.extractCommunityData(customerId, monthId);
}catch (Exception e){
log.error("治理能力排行【社区】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("治理能力排行【社区】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
governRankDataExtractService.extractStreetData(customerId, monthId);
}catch (Exception e){
log.error("治理能力排行【街道】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("治理能力排行【街道】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
governRankDataExtractService.extractDistrictData(customerId, monthId);
}catch (Exception e){
log.error("治理能力排行【全区】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("治理能力排行【全区】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
//先进排行 screen_org_rank_data
try {
orgRankExtractService.extractGridData(customerId, monthId);
}catch (Exception e){
log.error("先进排行【网格】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("先进排行【网格】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
orgRankExtractService.extractCommunityData(customerId, monthId);
}catch (Exception e){
log.error("先进排行【社区】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("先进排行【社区】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
orgRankExtractService.extractStreetData(customerId, monthId);
}catch (Exception e){
log.error("先进排行【街道】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("先进排行【街道】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
orgRankExtractService.extractDistrictData(customerId, monthId);
}catch (Exception e){
log.error("先进排行【全区】抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("先进排行【全区】抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
// 党建引领 screen_party_branch_data,screen_party_link_masses_data
try {
partyGuideService.partyGuideExtract(formDTO);
}catch (Exception e){
log.error("党建引领抽取到大屏失败,参数为:"+ JSON.toJSONString(formDTO), e);
} catch (Exception e) {
log.error("党建引领抽取到大屏失败,参数为:" + JSON.toJSONString(formDTO), e);
}
try {
//基层治理 - 热心市民 screen_party_user_rank_data
@ -361,20 +384,20 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
param.setCustomerId(customerId);
param.setDateId(monthId);
screenGrassrootsGovernDataAbsorptionService.userScoreDataHub(param);
}catch(Exception e){
} catch (Exception e) {
log.error("大屏热心市民/党员得分数据写入失败,参数为:{}" + JSON.toJSONString(formDTO), e);
}
try {
// 项目(事件)数量分析按网格_按月统计
screenProjectQuantityGridMonthlyService.extractionProjectGridMonthly(customerId, monthId);
}catch (Exception e){
log.error("项目(事件)数量分析按网格_按月统计失败,参数为{}" + JSON.toJSONString(formDTO),e);
} catch (Exception e) {
log.error("项目(事件)数量分析按网格_按月统计失败,参数为{}" + JSON.toJSONString(formDTO), e);
}
try {
// 项目(事件)数量分析按组织_按月统计
screenProjectQuantityOrgMonthlyService.extractionProjectOrgMonthly(customerId, monthId);
}catch (Exception e){
log.error("项目(事件)数量分析按组织_按月统计失败,参数为{}" + JSON.toJSONString(formDTO),e);
} catch (Exception e) {
log.error("项目(事件)数量分析按组织_按月统计失败,参数为{}" + JSON.toJSONString(formDTO), e);
}
//此方法保持在最后即可 计算指标分数 todo 优化 手动创建线程池 控制任务数量
ExecutorService pool = Executors.newSingleThreadExecutor();

Loading…
Cancel
Save