2 changed files with 83 additions and 18 deletions
			
			
		| @ -0,0 +1,62 @@ | |||
| package com.epmet.mq; | |||
| 
 | |||
| import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; | |||
| import com.epmet.commons.rocketmq.constants.TopicConstants; | |||
| import com.epmet.mq.listener.IssueProjectCategoryTagInitListener; | |||
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; | |||
| import org.apache.rocketmq.client.consumer.listener.MessageListener; | |||
| import org.apache.rocketmq.client.exception.MQClientException; | |||
| import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; | |||
| import org.springframework.beans.factory.annotation.Value; | |||
| import org.springframework.stereotype.Component; | |||
| 
 | |||
| import javax.annotation.PostConstruct; | |||
| 
 | |||
| @Component | |||
| public class RocketMQConsumerRegister { | |||
| 
 | |||
|     @Value("${rocketmq.name-server}") | |||
|     private String nameServer; | |||
| 
 | |||
|     /** | |||
|      * @return | |||
|      * @Description 注册监听器 | |||
|      * @author wxz | |||
|      * @date 2021.03.03 16:09 | |||
|      */ | |||
|     @PostConstruct | |||
|     public void registerAllListeners() { | |||
|         try { | |||
|             register(nameServer, ConsomerGroupConstants.ISSUE_PROJECT_CATEGORY_TAG, MessageModel.CLUSTERING, TopicConstants.INIT_CUSTOMER, "*", new IssueProjectCategoryTagInitListener()); | |||
|         } catch (MQClientException e) { | |||
|             e.printStackTrace(); | |||
|         } | |||
|     } | |||
| 
 | |||
|     public void register(String nameServer, String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException { | |||
|         // 实例化消费者
 | |||
|         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); | |||
| 
 | |||
|         // 设置NameServer的地址
 | |||
|         consumer.setNamesrvAddr(nameServer); | |||
|         consumer.setMessageModel(messageModel); | |||
|         consumer.setInstanceName(buildInstanceName()); | |||
|         // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 | |||
|         consumer.subscribe(topic, subException); | |||
|         // 注册回调实现类来处理从broker拉取回来的消息
 | |||
|         consumer.registerMessageListener(listener); | |||
|         // 启动消费者实例
 | |||
|         consumer.start(); | |||
|     } | |||
| 
 | |||
|     private String buildInstanceName() { | |||
|         String instanceName = ""; | |||
|         for (int i = 0; i < 4; i++) { | |||
|             int t = (int) (Math.random() * 10); | |||
|             instanceName = instanceName.concat(t + ""); | |||
|         } | |||
| 
 | |||
|         return instanceName; | |||
|     } | |||
| 
 | |||
| } | |||
					Loading…
					
					
				
		Reference in new issue