Browse Source

stats 项目变动订阅消息

dev_shibei_match
jianjun 4 years ago
parent
commit
ffcd3d47eb
  1. 4
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 1
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 18
      epmet-module/data-statistical/data-statistical-server/pom.xml
  4. 102
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java
  5. 66
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
  6. 10
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java
  7. 15
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java
  8. 11
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java
  9. 7
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java
  10. 3
      epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml

4
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -21,5 +21,9 @@ public interface ConsomerGroupConstants {
* 客户初始化议题项目分类标签数据
*/
String ISSUE_PROJECT_CATEGORY_TAG = "issue_project_category_tag";
/**
* 项目变动通知消费者组
*/
String PROJECT_CHANGED_COMPONENTS_GROUP = "project_changed_components_group";
}

1
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -2,4 +2,5 @@ package com.epmet.commons.rocketmq.constants;
public interface TopicConstants {
String INIT_CUSTOMER = "init_customer";
String PROJECT_CHANGED = "project_changed";
}

18
epmet-module/data-statistical/data-statistical-server/pom.xml

@ -110,6 +110,12 @@
<version>4.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-commons-rocketmq</artifactId>
<version>2.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
@ -248,6 +254,9 @@
<!--钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c</dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.nameserver>192.168.1.130:9876;192.168.1.132:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
@ -364,6 +373,9 @@
<!--钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c</dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.nameserver>192.168.1.130:9876;192.168.1.132:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
@ -479,6 +491,9 @@
<!--测试钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=90782b119f82a5b6bb8e0f819b6a77bbc2102b53aa2d7d2e24fa10b66d580b1c</dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.nameserver>192.168.10.161:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
@ -592,6 +607,9 @@
<!--生产钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c</dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.nameserver>192.168.11.187:9876;192.168.11.184:9876</rocketmq.nameserver>
</properties>
</profile>
</profiles>

102
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/GroupAchievementCustomListener.java

@ -0,0 +1,102 @@
package com.epmet.mq;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.extract.form.ExtractOriginFormDTO;
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService;
import com.epmet.service.evaluationindex.extract.toscreen.ScreenExtractService;
import com.epmet.util.DimIdGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PreDestroy;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @Description 项目变动-监听器
* @return
* @author wxz
* @date 2021.03.03 16:10
*/
@Slf4j
public class GroupAchievementCustomListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long start = System.currentTimeMillis();
try {
List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
customerIds.forEach(this::consumeMessage);
} catch (Exception e) {
//失败不重发
logger.error("consumeMessage fail,msg:{}",e.getMessage());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(String customerId) {
logger.info("receive customerId:{}", JSON.toJSONString(customerId));
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId)
,30L, 30L, TimeUnit.SECONDS);
if (StringUtils.isBlank(customerId)){
logger.error("consumer project_changed fail,msg:{}",customerId);
return;
}
//消息被消费太快 业务数据还没有完成 歇一会先
try {
Thread.sleep(60L);
} catch (InterruptedException e) {
logger.error("consumeMessage sleep exception",e);
}
ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO();
extractOriginFormDTO.setCustomerId(customerId);
String dateId = DimIdGenerator.getDateDimId(new Date());
extractOriginFormDTO.setDateId(dateId);
Future<?> aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null);
if (aBoolean.isDone()){
SpringContextUtils.getBean(ScreenExtractService.class).extractPartData(customerId,dateId);
}
logger.info("consumer projectChanged msg success,{}",aBoolean);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【RocketMQ】消费项目变动消息失败:",e);
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【RocketMQ】消费项目变动消息失败:",e);
throw e;
} finally {
if (distributedLock != null){
distributedLock.unLock(lock);
}
}
}
@PreDestroy
public void saveCalStatus() {
//todo
log.info("data-statical-server服务被关闭,执行未执行完的动作");
}
}

66
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java

@ -0,0 +1,66 @@
package com.epmet.mq;
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.tools.constant.NumConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class RocketMQConsumerRegister {
@Value("${rocketmq.name-server}")
private String nameServer;
/**
* @return
* @Description 注册监听器
* @author wxz
* @date 2021.03.03 16:09
*/
@PostConstruct
public void registerAllListeners() {
try {
register(ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new GroupAchievementCustomListener());
} catch (MQClientException e) {
log.error("registerAllListeners exception", e);
}
}
public void register(String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
// 设置NameServer的地址
consumer.setNamesrvAddr(nameServer);
consumer.setMessageModel(messageModel);
consumer.setInstanceName(buildInstanceName());
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(topic, subException);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(listener);
//一次批量拉去10条消息
consumer.setConsumeMessageBatchMaxSize(NumConstant.TEN);
// 启动消费者实例
consumer.start();
}
private String buildInstanceName() {
String instanceName = "";
for (int i = 0; i < 4; i++) {
int t = (int) (Math.random() * 10);
instanceName = instanceName.concat(t + "");
}
return instanceName;
}
}

10
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/FactOriginExtractService.java

@ -2,6 +2,9 @@ package com.epmet.service.evaluationindex.extract.todata;
import com.epmet.dto.extract.form.ExtractOriginFormDTO;
import java.util.List;
import java.util.concurrent.Future;
/**
* @author zhaoqifeng
* @dscription
@ -15,4 +18,11 @@ public interface FactOriginExtractService {
* @param extractOriginFormDTO
*/
void extractAll(ExtractOriginFormDTO extractOriginFormDTO);
/**
* desc:抽取项目相关业务数据到统计库
*
* @param param
*/
Future<?> submitProjectRelationData(ExtractOriginFormDTO param, List<String> finalDaysBetween);
}

15
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/todata/impl/FactOriginExtractServiceImpl.java

@ -83,7 +83,7 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
private void submitJob(ExtractOriginFormDTO param) {
boolean isRange = StringUtils.isBlank(param.getDateId()) ? true : false;
boolean isRange = StringUtils.isBlank(param.getDateId());
List<String> daysBetween = null;
if (isRange) {
daysBetween = DateUtils.getDaysBetween(param.getStartDate(), param.getEndDate());
@ -188,9 +188,15 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
}
});
threadPool.submit(() -> {
ExtractOriginFormDTO paramNew = ConvertUtils.sourceToTarget(param, ExtractOriginFormDTO.class);
if (!isRange) {
submitProjectRelationData(param, finalDaysBetween);
}
@Override
public Future<?> submitProjectRelationData(ExtractOriginFormDTO param, List<String> finalDaysBetween) {
Future<?> submit = threadPool.submit(() -> {
ExtractOriginFormDTO paramNew = null;
if (CollectionUtils.isEmpty(finalDaysBetween)) {
paramNew = ConvertUtils.sourceToTarget(param, ExtractOriginFormDTO.class);
try {
projectExtractService.saveOriginProjectDaily(paramNew);
} catch (Exception e) {
@ -237,5 +243,6 @@ public class FactOriginExtractServiceImpl implements FactOriginExtractService {
}
});
return submit;
}
}

11
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/ScreenExtractService.java

@ -25,4 +25,15 @@ public interface ScreenExtractService {
*/
void extractMonthlyAll(ExtractScreenFormDTO formDTO);
/**
* desc: 实时抽取相关的数据
*
* @param customerId
* @param dateId
* @return void
* @author LiuJanJun
* @date 2021/4/27 6:12 下午
*/
void extractPartData(String customerId, String dateId);
}

7
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/extract/toscreen/impl/ScreenExtractServiceImpl.java

@ -170,6 +170,12 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
log.error("公众参与排行抽取到大屏失败,customerId为:"+customerId+"dateId为:"+dateId, e);
}
extractPartData(customerId, dateId);
log.info("===== extractDaily method end ======");
}
@Override
public void extractPartData(String customerId, String dateId) {
ScreenCentralZoneDataFormDTO param = new ScreenCentralZoneDataFormDTO();
param.setCustomerId(customerId);
param.setDateId(dateId);
@ -223,7 +229,6 @@ public class ScreenExtractServiceImpl implements ScreenExtractService {
}catch(Exception e){
log.error("按天统计:组织内各个分类下的项目总数,customerId为:"+customerId+"dateId为:"+dateId, e);
}
log.info("===== extractDaily method end ======");
}
/**

3
epmet-module/data-statistical/data-statistical-server/src/main/resources/bootstrap.yml

@ -205,3 +205,6 @@ shutdown:
graceful:
enable: true #是否开启优雅停机
waitTimeSecs: 30 # 优雅停机等待时间,超过30秒,发出告警
rocketmq:
name-server: @rocketmq.nameserver@

Loading…
Cancel
Save