Browse Source

实时抽取项目数据部分 添加 objectid有针对性的处理数据

dev_shibei_match
jianjun 4 years ago
parent
commit
d11efb5858
  1. 5
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 5
      epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/extract/form/ExtractOriginFormDTO.java
  3. 20
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

5
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -12,7 +12,6 @@ import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.constant.StrConstant;
import com.epmet.commons.tools.utils.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ss.formula.functions.T;
/**
* @author Mark sunlightcs@gmail.com
@ -509,10 +508,6 @@ public class RedisKeys {
return rootPrefix.concat("sys:cors");
}
public static String getProjectChangedMsgDistinceKey(String customerId) {
return rootPrefix.concat("project_changed:consume:").concat(customerId);
}
/**
* desc:获取防重复提交redis key
* @param key

5
epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/extract/form/ExtractOriginFormDTO.java

@ -34,4 +34,9 @@ public class ExtractOriginFormDTO implements Serializable {
* 结束时间
*/
private String endDate;
/**
* 操作对象Id
*/
private String objectId;
}

20
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -87,21 +87,12 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId());
if (redisUtils.get(redisKey) == null) {
// 该线程启动消费之后,其他线程再收到该客户id的消费请求,则不消费
redisUtils.set(redisKey, msg, 30);
try {
consumeMessage(msgObj);
} catch (Exception e) {
// 如果消费失败了,清楚该key,收到消息之后可以继续消费
redisUtils.delete(redisKey);
throw e;
}
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId());
} else {
log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId());
try {
consumeMessage(msgObj);
} catch (Exception e) {
throw e;
}
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId());
if (org.apache.commons.lang.StringUtils.isNotBlank(pendingMsgLabel)) {
try {
@ -136,6 +127,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
String dateId = DimIdGenerator.getDateDimId(new Date());
extractOriginFormDTO.setDateId(dateId);
extractOriginFormDTO.setObjectId(msgObj.getProjectId());
Future<?> aBoolean = SpringContextUtils.getBean(FactOriginExtractService.class).submitProjectRelationData(extractOriginFormDTO,null);
try {
Object o = aBoolean.get();

Loading…
Cancel
Save