forked from rongchao/epmet-cloud-rizhao
				
			
				 4 changed files with 87 additions and 24 deletions
			
			
		@ -0,0 +1,62 @@ | 
				
			|||||
 | 
					package com.epmet.mq; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; | 
				
			||||
 | 
					import com.epmet.commons.rocketmq.constants.TopicConstants; | 
				
			||||
 | 
					import com.epmet.mq.listener.IssueProjectCategoryTagInitListener; | 
				
			||||
 | 
					import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; | 
				
			||||
 | 
					import org.apache.rocketmq.client.consumer.listener.MessageListener; | 
				
			||||
 | 
					import org.apache.rocketmq.client.exception.MQClientException; | 
				
			||||
 | 
					import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; | 
				
			||||
 | 
					import org.springframework.beans.factory.annotation.Value; | 
				
			||||
 | 
					import org.springframework.stereotype.Component; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import javax.annotation.PostConstruct; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					@Component | 
				
			||||
 | 
					public class RocketMQConsumerRegister { | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Value("${rocketmq.name-server}") | 
				
			||||
 | 
					    private String nameServer; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * @return | 
				
			||||
 | 
					     * @Description 注册监听器 | 
				
			||||
 | 
					     * @author wxz | 
				
			||||
 | 
					     * @date 2021.03.03 16:09 | 
				
			||||
 | 
					     */ | 
				
			||||
 | 
					    @PostConstruct | 
				
			||||
 | 
					    public void registerAllListeners() { | 
				
			||||
 | 
					        try { | 
				
			||||
 | 
					            register(nameServer, ConsomerGroupConstants.ISSUE_PROJECT_CATEGORY_TAG, MessageModel.CLUSTERING, TopicConstants.INIT_CUSTOMER, "*", new IssueProjectCategoryTagInitListener()); | 
				
			||||
 | 
					        } catch (MQClientException e) { | 
				
			||||
 | 
					            e.printStackTrace(); | 
				
			||||
 | 
					        } | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    public void register(String nameServer, String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException { | 
				
			||||
 | 
					        // 实例化消费者
 | 
				
			||||
 | 
					        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        // 设置NameServer的地址
 | 
				
			||||
 | 
					        consumer.setNamesrvAddr(nameServer); | 
				
			||||
 | 
					        consumer.setMessageModel(messageModel); | 
				
			||||
 | 
					        consumer.setInstanceName(buildInstanceName()); | 
				
			||||
 | 
					        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 | 
				
			||||
 | 
					        consumer.subscribe(topic, subException); | 
				
			||||
 | 
					        // 注册回调实现类来处理从broker拉取回来的消息
 | 
				
			||||
 | 
					        consumer.registerMessageListener(listener); | 
				
			||||
 | 
					        // 启动消费者实例
 | 
				
			||||
 | 
					        consumer.start(); | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    private String buildInstanceName() { | 
				
			||||
 | 
					        String instanceName = ""; | 
				
			||||
 | 
					        for (int i = 0; i < 4; i++) { | 
				
			||||
 | 
					            int t = (int) (Math.random() * 10); | 
				
			||||
 | 
					            instanceName = instanceName.concat(t + ""); | 
				
			||||
 | 
					        } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					        return instanceName; | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					} | 
				
			||||
					Loading…
					
					
				
		Reference in new issue