|
@ -10,6 +10,7 @@ import com.epmet.commons.tools.exception.RenException; |
|
|
import com.epmet.commons.tools.redis.RedisKeys; |
|
|
import com.epmet.commons.tools.redis.RedisKeys; |
|
|
import com.epmet.commons.tools.redis.RedisUtils; |
|
|
import com.epmet.commons.tools.redis.RedisUtils; |
|
|
import com.epmet.commons.tools.utils.SpringContextUtils; |
|
|
import com.epmet.commons.tools.utils.SpringContextUtils; |
|
|
|
|
|
import com.epmet.constant.PingYinConstant; |
|
|
import com.epmet.constant.SystemMessageType; |
|
|
import com.epmet.constant.SystemMessageType; |
|
|
import com.epmet.dto.extract.form.ExtractOriginFormDTO; |
|
|
import com.epmet.dto.extract.form.ExtractOriginFormDTO; |
|
|
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; |
|
|
import com.epmet.service.evaluationindex.extract.todata.FactOriginExtractService; |
|
@ -66,7 +67,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
//失败不重发
|
|
|
//失败不重发
|
|
|
logger.error("consumeMessage fail,msg:{}",e.getMessage()); |
|
|
logger.error("consumeMessage fail,msg:{}",e.getMessage()); |
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|
|
} |
|
|
} |
|
|
log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start); |
|
|
log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start); |
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
@ -94,7 +95,7 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently |
|
|
} |
|
|
} |
|
|
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); |
|
|
log.info("消费了项目变动消息,customer id:{}", msgObj.getCustomerId()); |
|
|
|
|
|
|
|
|
if (org.apache.commons.lang.StringUtils.isNotBlank(pendingMsgLabel)) { |
|
|
if (StringUtils.isNotBlank(pendingMsgLabel)) { |
|
|
try { |
|
|
try { |
|
|
removePendingMqMsgCache(pendingMsgLabel); |
|
|
removePendingMqMsgCache(pendingMsgLabel); |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
@ -109,19 +110,14 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently |
|
|
try { |
|
|
try { |
|
|
String customerId = msgObj.getCustomerId(); |
|
|
String customerId = msgObj.getCustomerId(); |
|
|
distributedLock = SpringContextUtils.getBean(DistributedLock.class); |
|
|
distributedLock = SpringContextUtils.getBean(DistributedLock.class); |
|
|
lock = distributedLock.getLock(String.format("lock:project_changed:%s", customerId) |
|
|
lock = distributedLock.getLock(String.format("lock:project_changed:%s:%s", customerId, msgObj.getProjectId()) |
|
|
,30L, 30L, TimeUnit.SECONDS); |
|
|
,30L, 30L, TimeUnit.SECONDS); |
|
|
|
|
|
|
|
|
if (StringUtils.isBlank(customerId)){ |
|
|
if (StringUtils.isBlank(customerId)){ |
|
|
logger.error("consumer project_changed fail,msg:{}",customerId); |
|
|
logger.error("consumer project_changed fail,msg:{}",customerId); |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
//消息被消费太快 业务数据还没有完成 歇一会先
|
|
|
|
|
|
try { |
|
|
|
|
|
Thread.sleep(60L); |
|
|
|
|
|
} catch (InterruptedException e) { |
|
|
|
|
|
logger.error("consumeMessage sleep exception",e); |
|
|
|
|
|
} |
|
|
|
|
|
ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO(); |
|
|
ExtractOriginFormDTO extractOriginFormDTO = new ExtractOriginFormDTO(); |
|
|
extractOriginFormDTO.setCustomerId(customerId); |
|
|
extractOriginFormDTO.setCustomerId(customerId); |
|
|
|
|
|
|
|
@ -146,8 +142,30 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently |
|
|
} |
|
|
} |
|
|
logger.info("consumer projectChanged msg success,{}",aBoolean); |
|
|
logger.info("consumer projectChanged msg success,{}",aBoolean); |
|
|
|
|
|
|
|
|
|
|
|
sendProject2OpenData(msgObj, customerId); |
|
|
|
|
|
|
|
|
|
|
|
} catch (RenException e) { |
|
|
|
|
|
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
|
|
|
|
|
|
logger.error("【RocketMQ】消费项目变动消息失败:",e); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
// 不是我们自己抛出的异常,可以让MQ重试
|
|
|
|
|
|
logger.error("【RocketMQ】消费项目变动消息异常:",e); |
|
|
|
|
|
throw e; |
|
|
|
|
|
} finally { |
|
|
|
|
|
if (distributedLock != null){ |
|
|
|
|
|
distributedLock.unLock(lock); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* desc:发型项目数据到 opendata |
|
|
|
|
|
* @param msgObj |
|
|
|
|
|
* @param customerId |
|
|
|
|
|
*/ |
|
|
|
|
|
private void sendProject2OpenData(ProjectChangedMQMsg msgObj, String customerId) { |
|
|
//发送项目数据上报的mq消息
|
|
|
//发送项目数据上报的mq消息
|
|
|
if ("6f203e30de1a65aab7e69c058826cd80".equals(customerId)) { |
|
|
if (PingYinConstant.PROD_PING_YIN_CUSTOMER_ID.equals(customerId)) { |
|
|
if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation()) || "close".equals(msgObj.getOperation())) { |
|
|
if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation()) || "close".equals(msgObj.getOperation())) { |
|
|
String type; |
|
|
String type; |
|
|
if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation())) { |
|
|
if ("issue_shift_project".equals(msgObj.getOperation()) || "created".equals(msgObj.getOperation())) { |
|
@ -161,18 +179,6 @@ public class ProjectChangedCustomListener implements MessageListenerConcurrently |
|
|
SpringContextUtils.getBean(ScreenProjectDataService.class).sendProjectChangeMq(msg); |
|
|
SpringContextUtils.getBean(ScreenProjectDataService.class).sendProjectChangeMq(msg); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (RenException e) { |
|
|
|
|
|
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
|
|
|
|
|
|
logger.error("【RocketMQ】消费项目变动消息失败:",e); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
// 不是我们自己抛出的异常,可以让MQ重试
|
|
|
|
|
|
logger.error("【RocketMQ】消费项目变动消息异常:",e); |
|
|
|
|
|
throw e; |
|
|
|
|
|
} finally { |
|
|
|
|
|
if (distributedLock != null){ |
|
|
|
|
|
distributedLock.unLock(lock); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@PreDestroy |
|
|
@PreDestroy |
|
|