5 changed files with 77 additions and 26 deletions
			
			
		@ -0,0 +1,67 @@ | 
				
			|||
package com.epmet.mq.listener; | 
				
			|||
 | 
				
			|||
import com.alibaba.fastjson.JSON; | 
				
			|||
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; | 
				
			|||
import com.epmet.commons.rocketmq.constants.TopicConstants; | 
				
			|||
import com.epmet.commons.rocketmq.messages.InitCustomerMQMsg; | 
				
			|||
import com.epmet.commons.tools.distributedlock.DistributedLock; | 
				
			|||
import com.epmet.commons.tools.exception.ExceptionUtils; | 
				
			|||
import com.epmet.commons.tools.exception.RenException; | 
				
			|||
import com.epmet.dto.form.CategoryTagInitFormDTO; | 
				
			|||
import com.epmet.service.IssueProjectCategoryDictService; | 
				
			|||
import org.apache.rocketmq.common.message.MessageExt; | 
				
			|||
import org.apache.rocketmq.spring.annotation.MessageModel; | 
				
			|||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; | 
				
			|||
import org.apache.rocketmq.spring.core.RocketMQListener; | 
				
			|||
import org.redisson.api.RLock; | 
				
			|||
import org.slf4j.Logger; | 
				
			|||
import org.slf4j.LoggerFactory; | 
				
			|||
import org.springframework.beans.factory.annotation.Autowired; | 
				
			|||
import org.springframework.stereotype.Component; | 
				
			|||
 | 
				
			|||
import java.util.concurrent.TimeUnit; | 
				
			|||
 | 
				
			|||
/** | 
				
			|||
 * 监听初始化客户动作,为客户初始化议题、项目的分类、标签数据 | 
				
			|||
 */ | 
				
			|||
@RocketMQMessageListener(topic = TopicConstants.INIT_CUSTOMER, | 
				
			|||
        consumerGroup = ConsomerGroupConstants.ISSUE_PROJECT_CATEGORY_TAG, | 
				
			|||
        messageModel = MessageModel.CLUSTERING, | 
				
			|||
        selectorExpression = "*") | 
				
			|||
@Component | 
				
			|||
public class IssueProjectCategoryTagInitListener implements RocketMQListener<MessageExt> { | 
				
			|||
 | 
				
			|||
    private Logger logger = LoggerFactory.getLogger(getClass()); | 
				
			|||
 | 
				
			|||
    @Autowired | 
				
			|||
    private IssueProjectCategoryDictService issueProjectCategoryDictService; | 
				
			|||
 | 
				
			|||
    @Autowired | 
				
			|||
    private DistributedLock distributedLock; | 
				
			|||
 | 
				
			|||
    @Override | 
				
			|||
    public void onMessage(MessageExt messageExt) { | 
				
			|||
        String msg = new String(messageExt.getBody()); | 
				
			|||
        logger.info("初始化客户-初始化客户自定义信息-收到消息内容:{}", msg); | 
				
			|||
        InitCustomerMQMsg msgObj = JSON.parseObject(msg, InitCustomerMQMsg.class); | 
				
			|||
 | 
				
			|||
        CategoryTagInitFormDTO dto = new CategoryTagInitFormDTO(); | 
				
			|||
        dto.setCustomerId(msgObj.getCustomerId()); | 
				
			|||
 | 
				
			|||
        RLock lock = null; | 
				
			|||
        try { | 
				
			|||
            lock = distributedLock.getLock(String.format("lock:init_customer_home:%s", msgObj.getCustomerId()), | 
				
			|||
                    30l, 30l, TimeUnit.SECONDS); | 
				
			|||
            issueProjectCategoryDictService.init(dto); | 
				
			|||
        } catch (RenException e) { | 
				
			|||
            // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
 | 
				
			|||
            logger.error("【RocketMQ】初始化客户组件失败:".concat(ExceptionUtils.getErrorStackTrace(e))); | 
				
			|||
        } catch (Exception e) { | 
				
			|||
            // 不是我们自己抛出的异常,可以让MQ重试
 | 
				
			|||
            logger.error("【RocketMQ】初始化客户组件失败:".concat(ExceptionUtils.getErrorStackTrace(e))); | 
				
			|||
             throw e; | 
				
			|||
        } finally { | 
				
			|||
            distributedLock.unLock(lock); | 
				
			|||
        } | 
				
			|||
    } | 
				
			|||
} | 
				
			|||
					Loading…
					
					
				
		Reference in new issue