forked from rongchao/epmet-cloud-rizhao
				
			
				 25 changed files with 251 additions and 415 deletions
			
			
		| @ -1,43 +0,0 @@ | |||
| package com.epmet.commons.rocketmq.register; | |||
| 
 | |||
| import lombok.Data; | |||
| import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; | |||
| 
 | |||
| import java.io.Serializable; | |||
| 
 | |||
| /** | |||
|  * desc:mq 消费配置类 | |||
|  * | |||
|  * @author: LiuJanJun | |||
|  * @date: 2021/4/30 2:39 下午 | |||
|  * @version: 1.0 | |||
|  */ | |||
| @Data | |||
| public class ConsumerConfigProperties implements Serializable { | |||
| 
 | |||
|     private static final long serialVersionUID = 2069676324708473773L; | |||
|     /** | |||
|      * 消费者组 | |||
|      */ | |||
|     private String consumerGroup; | |||
|     /** | |||
|      * 主题 | |||
|      */ | |||
|     private String topic; | |||
|     /** | |||
|      * 标签 | |||
|      */ | |||
|     private String tag = "*"; | |||
|     /** | |||
|      * 最小消费的线程数 | |||
|      */ | |||
|     private int consumeThreadMin = 2; | |||
|     /** | |||
|      * 最大消费的线程数 | |||
|      */ | |||
|     private int consumeThreadMax = 4; | |||
|     /** | |||
|      * 消费监听器 | |||
|      */ | |||
|     private MessageListenerConcurrently consumerListener; | |||
| } | |||
| @ -0,0 +1,77 @@ | |||
| package com.epmet.commons.rocketmq.register; | |||
| 
 | |||
| import com.epmet.commons.tools.exception.ExceptionUtils; | |||
| import lombok.extern.slf4j.Slf4j; | |||
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; | |||
| import org.apache.rocketmq.client.consumer.listener.MessageListener; | |||
| import org.apache.rocketmq.client.exception.MQClientException; | |||
| import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; | |||
| 
 | |||
| /** | |||
|  * @author wxz | |||
|  * @Description 父类抽象注册器 | |||
|  * @date 2021.07.14 15:38:21 | |||
|  */ | |||
| @Slf4j | |||
| public abstract class MQAbstractRegister { | |||
| 
 | |||
|     /** | |||
|      * @Description 注册所有监听器,由子类实现,做具体监听器的注册 | |||
|      * @return | |||
|      * @author wxz | |||
|      * @date 2021.07.14 15:48 | |||
|     */ | |||
|     public abstract void registerAllListeners(String env, MQConsumerProperties consumerProperties); | |||
| 
 | |||
|     /** | |||
|      * @Description 真正执行注册的方法,供子类直接调用,子类也可以覆盖该方法 | |||
|      * @return | |||
|      * @author wxz | |||
|      * @date 2021.07.14 15:56 | |||
|     */ | |||
|     public void register(MQConsumerProperties consumerProperties, String group, MessageModel messageModel, String topic, String subExpression, MessageListener listener) { | |||
|         try { | |||
|             String nameServer = consumerProperties.getNameServer(); | |||
|             Integer consumeThreadMin = consumerProperties.getConsumeThreadMin(); | |||
|             Integer consumeThreadMax = consumerProperties.getConsumeThreadMax(); | |||
|             String instanceName = buildInstanceName(); | |||
|             // 实例化消费者
 | |||
|             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); | |||
| 
 | |||
|             // 设置NameServer的地址
 | |||
|             consumer.setNamesrvAddr(nameServer); | |||
|             consumer.setMessageModel(messageModel); | |||
|             consumer.setInstanceName(instanceName); | |||
|             // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 | |||
|             consumer.subscribe(topic, subExpression); | |||
|             // 注册回调实现类来处理从broker拉取回来的消息
 | |||
|             consumer.registerMessageListener(listener); | |||
|             if (consumeThreadMin != null) { | |||
|                 consumer.setConsumeThreadMin(consumeThreadMin); | |||
|             } | |||
|             if (consumeThreadMax != null) { | |||
|                 consumer.setConsumeThreadMax(consumeThreadMax); | |||
|             } | |||
|             // 启动消费者实例
 | |||
|             consumer.start(); | |||
|             log.info(String.format("监听器注册完成,消费者组:%s,Topic:%s,Tag:%s,实例名称:%s", group, topic, subExpression, instanceName)); | |||
|         } catch (Exception e) { | |||
|             log.error(String.format("监听器注册失败,消费者组:%s,Topic:%s,Tag:%s。详细信息:%s", group, topic, subExpression, ExceptionUtils.getErrorStackTrace(e))); | |||
|         } | |||
|     } | |||
| 
 | |||
|     /** | |||
|      * @Description 构造实例名称 | |||
|      * @return | |||
|      * @author wxz | |||
|      * @date 2021.07.14 15:56 | |||
|     */ | |||
|     private String buildInstanceName() { | |||
|         String instanceName = ""; | |||
|         for (int i = 0; i < 4; i++) { | |||
|             int t = (int) (Math.random() * 10); | |||
|             instanceName = instanceName.concat(t + ""); | |||
|         } | |||
|         return instanceName; | |||
|     } | |||
| } | |||
| @ -0,0 +1,29 @@ | |||
| package com.epmet.commons.rocketmq.register; | |||
| 
 | |||
| import lombok.Data; | |||
| import org.springframework.boot.context.properties.ConfigurationProperties; | |||
| 
 | |||
| /** | |||
|  * @Description 注册属性 | |||
|  * @author wxz | |||
|  * @date 2021.07.14 15:33:16 | |||
| */ | |||
| @Data | |||
| @ConfigurationProperties(prefix = "rocketmq") | |||
| public class MQConsumerProperties { | |||
| 
 | |||
|     /** | |||
|      * nameServer | |||
|      */ | |||
|     private String nameServer; | |||
| 
 | |||
|     /** | |||
|      * 最小消费线程数 | |||
|      */ | |||
|     private Integer consumeThreadMin; | |||
|     /** | |||
|      * 最大消费线程数 | |||
|      */ | |||
|     private Integer consumeThreadMax; | |||
| 
 | |||
| } | |||
| @ -1,79 +0,0 @@ | |||
| package com.epmet.commons.rocketmq.register; | |||
| 
 | |||
| import lombok.extern.slf4j.Slf4j; | |||
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; | |||
| import org.apache.rocketmq.client.exception.MQClientException; | |||
| import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; | |||
| import org.springframework.beans.factory.annotation.Value; | |||
| 
 | |||
| import javax.annotation.PostConstruct; | |||
| 
 | |||
| /** | |||
|  * desc:注册mq监听器 | |||
|  * | |||
|  * @author liujianjun | |||
|  */ | |||
| @Slf4j | |||
| public abstract class MQConsumerRegister { | |||
|     @Value("${spring.profiles.active}") | |||
|     private String env; | |||
|     @Value("${rocketmq.name-server}") | |||
|     private String namesrvAddr; | |||
| 
 | |||
|     public abstract ConsumerConfigProperties getConsumerProperty(); | |||
| 
 | |||
| 
 | |||
|     /** | |||
|      * @return | |||
|      * @Description 注册监听器 | |||
|      * @author wxz | |||
|      * @date 2021.03.03 16:09 | |||
|      */ | |||
|     @PostConstruct | |||
|     public void registerMQListener() { | |||
|         ConsumerConfigProperties consumerProperty = getConsumerProperty(); | |||
|         log.info("registerAllListeners consumers:{} success", consumerProperty); | |||
|         //本地环境不注册
 | |||
|         //if (!"local".equals(env)) {
 | |||
|             try { | |||
|                 // 实例化消费者
 | |||
|                 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerProperty.getConsumerGroup()); | |||
| 
 | |||
|                 // 设置NameServer的地址
 | |||
|                 consumer.setNamesrvAddr(namesrvAddr); | |||
|                 consumer.setMessageModel(MessageModel.CLUSTERING); | |||
|                 consumer.setInstanceName(buildInstanceName()); | |||
|                 // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 | |||
|                 consumer.subscribe(consumer.getConsumerGroup(), consumerProperty.getTag()); | |||
|                 // 注册回调实现类来处理从broker拉取回来的消息
 | |||
|                 consumer.registerMessageListener(consumerProperty.getConsumerListener()); | |||
|                 consumer.setConsumeThreadMin(consumerProperty.getConsumeThreadMin()); | |||
|                 consumer.setConsumeThreadMax(consumerProperty.getConsumeThreadMax()); | |||
|                 // 启动消费者实例
 | |||
|                 consumer.start(); | |||
|             } catch (MQClientException e) { | |||
|                 log.info("registerMQListener exception", e); | |||
|             } | |||
| 
 | |||
|         //}
 | |||
| 
 | |||
|     } | |||
| 
 | |||
|     /** | |||
|      * desc: 因为docker-compose部署有问题 所有自己命名 | |||
|      * | |||
|      * @param | |||
|      * @return java.lang.String | |||
|      * @author LiuJanJun | |||
|      * @date 2021/4/30 5:00 下午 | |||
|      */ | |||
|     private String buildInstanceName() { | |||
|         String instanceName = ""; | |||
|         for (int i = 0; i < 4; i++) { | |||
|             int t = (int) (Math.random() * 10); | |||
|             instanceName = instanceName.concat(t + ""); | |||
|         } | |||
| 
 | |||
|         return instanceName; | |||
|     } | |||
| } | |||
| @ -0,0 +1,35 @@ | |||
| package com.epmet.commons.rocketmq.register; | |||
| 
 | |||
| import lombok.extern.slf4j.Slf4j; | |||
| import org.springframework.beans.factory.annotation.Autowired; | |||
| import org.springframework.beans.factory.annotation.Value; | |||
| import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | |||
| import org.springframework.boot.context.properties.EnableConfigurationProperties; | |||
| import org.springframework.context.annotation.Configuration; | |||
| 
 | |||
| import javax.annotation.PostConstruct; | |||
| 
 | |||
| /** | |||
|  * @author wxz | |||
|  * @Description MQ注册配置类 | |||
|  * @date 2021.07.14 15:36:24 | |||
|  */ | |||
| @Configuration | |||
| @ConditionalOnProperty(prefix = "rocketmq", name = "enable", havingValue = "true", matchIfMissing = false) | |||
| @EnableConfigurationProperties(MQConsumerProperties.class) | |||
| @Slf4j | |||
| public class MQRegisterConfiguration { | |||
| 
 | |||
|     @Value("${spring.profiles.active}") | |||
|     private String env; | |||
|     @Autowired | |||
|     private MQConsumerProperties consumerProperties; | |||
|     @Autowired | |||
|     private MQAbstractRegister mqRegister; | |||
| 
 | |||
|     @PostConstruct | |||
|     public void register() { | |||
|         mqRegister.registerAllListeners(env, consumerProperties); | |||
|         log.info("监听器注册动作执行完毕"); | |||
|     } | |||
| } | |||
					Loading…
					
					
				
		Reference in new issue