Browse Source

Merge remote-tracking branch 'remotes/origin/dev'

master
jianjun 4 years ago
parent
commit
484e3bcb68
  1. 4
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 18
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java
  3. 2
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java
  4. 39
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java
  5. 10
      epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java
  6. 13
      epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml
  7. 4
      epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectImgDataDao.xml

4
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -469,4 +469,8 @@ public class RedisKeys {
public static String getCorsConfigKey() {
return rootPrefix.concat("sys:cors");
}
public static String getProjectChangedMsgDistinceKey(String customerId) {
return rootPrefix.concat("project_changed:consume:").concat(customerId);
}
}

18
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectDataDao.java

@ -46,4 +46,22 @@ public interface ScreenProjectDataDao extends BaseDao<ScreenProjectDataEntity> {
void insertBatch(@Param("list") List<ScreenProjectDataDTO> list);
void updateBatch(@Param("list") List<ScreenProjectDataDTO> list,@Param("dateId") String dateId);
/**
* 根据项目ID删除数据
* @author zhaoqifeng
* @date 2021/7/9 17:33
* @param projectId
* @return int
*/
int deleteByProjectId(@Param("projectId") String projectId);
/**
* 根据项目ID删除数据
* @author zhaoqifeng
* @date 2021/7/9 17:33
* @param list
* @return int
*/
void deleteByProjectIds(@Param("list") List<String> list);
}

2
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/dao/evaluationindex/screen/ScreenProjectImgDataDao.java

@ -37,4 +37,6 @@ public interface ScreenProjectImgDataDao extends BaseDao<ScreenProjectImgDataEnt
void deleteByProjectIds(@Param("list") List<String> list);
void insertBatch(@Param("list") List<ScreenProjectImgDataDTO> list);
int deleteByProjectId(@Param("projectId") String projectId);
}

39
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/mq/ProjectChangedCustomListener.java

@ -5,6 +5,8 @@ import com.epmet.commons.rocketmq.messages.ProjectChangedMQMsg;
import com.epmet.commons.tools.constant.NumConstant;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.dto.extract.form.ExtractOriginFormDTO;
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService;
@ -21,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.PreDestroy;
import java.util.Date;
@ -41,24 +44,16 @@ import java.util.stream.Collectors;
public class ProjectChangedCustomListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* 控制通知类型消息的消费频率
*/
private static final Cache<String, String> customerIdCache = CacheBuilder.newBuilder().maximumSize(NumConstant.ONE_HUNDRED)
.expireAfterWrite(NumConstant.THIRTY,TimeUnit.SECONDS).build();
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long start = System.currentTimeMillis();
try {
List<String> customerIds = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
for (String customerId : customerIds) {
//获取缓存 如果不存在缓存中 则执行消费 并放入缓存中
String ifPresent = customerIdCache.getIfPresent(customerId);
if (StringUtils.isBlank(ifPresent)){
consumeMessage(customerId);
customerIdCache.put(customerId,customerId);
}
List<String> msgStrs = msgs.stream().map(messageExt -> new String(messageExt.getBody())).distinct().collect(Collectors.toList());
for (String msgStr : msgStrs) {
consumeMessage(msgStr);
}
} catch (Exception e) {
//失败不重发
@ -76,6 +71,23 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
log.warn("consumeMessage msg body is blank");
return;
}
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
String redisKey = RedisKeys.getProjectChangedMsgDistinceKey(msgObj.getCustomerId());
if (redisUtils.get(redisKey) == null) {
consumeMessage(msgObj);
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId());
// 有效期30秒
redisUtils.set(redisKey, msg, 30);
} else {
log.info("该客户的项目变动消息刚刚消费,请等待30秒,customer id:{}", msgObj.getCustomerId());
}
}
public void consumeMessage(ProjectChangedMQMsg msgObj) {
DistributedLock distributedLock = null;
RLock lock = null;
try {
@ -128,6 +140,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently
}
}
}
@PreDestroy
public void saveCalStatus() {
//todo

10
epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/evaluationindex/screen/impl/ScreenProjectDataServiceImpl.java

@ -173,14 +173,8 @@ public class ScreenProjectDataServiceImpl extends BaseServiceImpl<ScreenProjectD
}
}
//先删除该projectId对应的旧数据
QueryWrapper<ScreenProjectDataEntity> deleteWrapper = new QueryWrapper<>();
deleteWrapper.eq(StringUtils.isNotBlank(item.getProjectId()), "project_id", item.getProjectId())
.eq(StringUtils.isNotBlank(param.getCustomerId()), "customer_id", param.getCustomerId());
baseDao.delete(deleteWrapper);
QueryWrapper<ScreenProjectImgDataEntity> screenProjectImgDataEntityQueryWrapper = new QueryWrapper<>();
screenProjectImgDataEntityQueryWrapper.eq(StringUtils.isNotBlank(item.getProjectId()), "project_id", item.getProjectId())
.eq(StringUtils.isNotBlank(param.getCustomerId()), "customer_id", param.getCustomerId());
screenProjectImgDataDao.delete(screenProjectImgDataEntityQueryWrapper);
baseDao.deleteByProjectId(item.getProjectId());
screenProjectImgDataDao.deleteByProjectId(item.getProjectId());
//如果orgType未知,获取一下
// if ("unknown".equals(item.getOrgType())){

13
epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectDataDao.xml

@ -46,6 +46,19 @@
and DATE_FORMAT(PROJECT_CREATE_TIME,'%Y%m%d') = #{dateId}
limit 1000
</delete>
<delete id="deleteByProjectId">
delete from screen_project_data
where PROJECT_ID = #{projectId}
</delete>
<delete id="deleteByProjectIds">
delete from screen_project_data
<where>
<foreach collection="list" item="projectId" separator=" or ">
PROJECT_ID = #{projectId}
</foreach>
</where>
</delete>
<select id="checkIfExisted" resultType="int">
select count(id) from screen_project_data

4
epmet-module/data-statistical/data-statistical-server/src/main/resources/mapper/evaluationindex/screen/ScreenProjectImgDataDao.xml

@ -26,6 +26,10 @@
</foreach>
</where>
</delete>
<delete id="deleteByProjectId">
delete from screen_project_img_data
where PROJECT_ID = #{projectId}
</delete>
<insert id="insertBatch" parameterType="java.util.List">

Loading…
Cancel
Save