9 changed files with 171 additions and 176 deletions
			
			
		@ -1,43 +0,0 @@ | 
				
			|||||
package com.epmet.commons.rocketmq.register; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
import lombok.Data; | 
					 | 
				
			||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
import java.io.Serializable; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
/** | 
					 | 
				
			||||
 * desc:mq 消费配置类 | 
					 | 
				
			||||
 * | 
					 | 
				
			||||
 * @author: LiuJanJun | 
					 | 
				
			||||
 * @date: 2021/4/30 2:39 下午 | 
					 | 
				
			||||
 * @version: 1.0 | 
					 | 
				
			||||
 */ | 
					 | 
				
			||||
@Data | 
					 | 
				
			||||
public class ConsumerConfigProperties implements Serializable { | 
					 | 
				
			||||
 | 
					 | 
				
			||||
    private static final long serialVersionUID = 2069676324708473773L; | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * 消费者组 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private String consumerGroup; | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * 主题 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private String topic; | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * 标签 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private String tag = "*"; | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * 最小消费的线程数 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private int consumeThreadMin = 2; | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * 最大消费的线程数 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private int consumeThreadMax = 4; | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * 消费监听器 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private MessageListenerConcurrently consumerListener; | 
					 | 
				
			||||
} | 
					 | 
				
			||||
@ -0,0 +1,77 @@ | 
				
			|||||
 | 
					package com.epmet.commons.rocketmq.register; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import com.epmet.commons.tools.exception.ExceptionUtils; | 
				
			||||
 | 
					import lombok.extern.slf4j.Slf4j; | 
				
			||||
 | 
					import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; | 
				
			||||
 | 
					import org.apache.rocketmq.client.consumer.listener.MessageListener; | 
				
			||||
 | 
					import org.apache.rocketmq.client.exception.MQClientException; | 
				
			||||
 | 
					import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					/** | 
				
			||||
 | 
					 * @author wxz | 
				
			||||
 | 
					 * @Description 父类抽象注册器 | 
				
			||||
 | 
					 * @date 2021.07.14 15:38:21 | 
				
			||||
 | 
					 */ | 
				
			||||
 | 
					@Slf4j | 
				
			||||
 | 
					public abstract class MQAbstractRegister { | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * @Description 注册所有监听器,由子类实现,做具体监听器的注册 | 
				
			||||
 | 
					     * @return | 
				
			||||
 | 
					     * @author wxz | 
				
			||||
 | 
					     * @date 2021.07.14 15:48 | 
				
			||||
 | 
					    */ | 
				
			||||
 | 
					    public abstract void registerAllListeners(String env, MQConsumerProperties consumerProperties); | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * @Description 真正执行注册的方法,供子类直接调用,子类也可以覆盖该方法 | 
				
			||||
 | 
					     * @return | 
				
			||||
 | 
					     * @author wxz | 
				
			||||
 | 
					     * @date 2021.07.14 15:56 | 
				
			||||
 | 
					    */ | 
				
			||||
 | 
					    public void register(MQConsumerProperties consumerProperties, String group, MessageModel messageModel, String topic, String subExpression, MessageListener listener) { | 
				
			||||
 | 
					        try { | 
				
			||||
 | 
					            String nameServer = consumerProperties.getNameServer(); | 
				
			||||
 | 
					            Integer consumeThreadMin = consumerProperties.getConsumeThreadMin(); | 
				
			||||
 | 
					            Integer consumeThreadMax = consumerProperties.getConsumeThreadMax(); | 
				
			||||
 | 
					            String instanceName = buildInstanceName(); | 
				
			||||
 | 
					            // 实例化消费者
 | 
				
			||||
 | 
					            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					            // 设置NameServer的地址
 | 
				
			||||
 | 
					            consumer.setNamesrvAddr(nameServer); | 
				
			||||
 | 
					            consumer.setMessageModel(messageModel); | 
				
			||||
 | 
					            consumer.setInstanceName(instanceName); | 
				
			||||
 | 
					            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 | 
				
			||||
 | 
					            consumer.subscribe(topic, subExpression); | 
				
			||||
 | 
					            // 注册回调实现类来处理从broker拉取回来的消息
 | 
				
			||||
 | 
					            consumer.registerMessageListener(listener); | 
				
			||||
 | 
					            if (consumeThreadMin != null) { | 
				
			||||
 | 
					                consumer.setConsumeThreadMin(consumeThreadMin); | 
				
			||||
 | 
					            } | 
				
			||||
 | 
					            if (consumeThreadMax != null) { | 
				
			||||
 | 
					                consumer.setConsumeThreadMax(consumeThreadMax); | 
				
			||||
 | 
					            } | 
				
			||||
 | 
					            // 启动消费者实例
 | 
				
			||||
 | 
					            consumer.start(); | 
				
			||||
 | 
					            log.info(String.format("监听器注册完成,消费者组:%s,Topic:%s,Tag:%s,实例名称:%s", group, topic, subExpression, instanceName)); | 
				
			||||
 | 
					        } catch (Exception e) { | 
				
			||||
 | 
					            log.error(String.format("监听器注册失败,消费者组:%s,Topic:%s,Tag:%s。详细信息:%s", group, topic, subExpression, ExceptionUtils.getErrorStackTrace(e))); | 
				
			||||
 | 
					        } | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * @Description 构造实例名称 | 
				
			||||
 | 
					     * @return | 
				
			||||
 | 
					     * @author wxz | 
				
			||||
 | 
					     * @date 2021.07.14 15:56 | 
				
			||||
 | 
					    */ | 
				
			||||
 | 
					    private String buildInstanceName() { | 
				
			||||
 | 
					        String instanceName = ""; | 
				
			||||
 | 
					        for (int i = 0; i < 4; i++) { | 
				
			||||
 | 
					            int t = (int) (Math.random() * 10); | 
				
			||||
 | 
					            instanceName = instanceName.concat(t + ""); | 
				
			||||
 | 
					        } | 
				
			||||
 | 
					        return instanceName; | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					} | 
				
			||||
@ -0,0 +1,29 @@ | 
				
			|||||
 | 
					package com.epmet.commons.rocketmq.register; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import lombok.Data; | 
				
			||||
 | 
					import org.springframework.boot.context.properties.ConfigurationProperties; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					/** | 
				
			||||
 | 
					 * @Description 注册属性 | 
				
			||||
 | 
					 * @author wxz | 
				
			||||
 | 
					 * @date 2021.07.14 15:33:16 | 
				
			||||
 | 
					*/ | 
				
			||||
 | 
					@Data | 
				
			||||
 | 
					@ConfigurationProperties(prefix = "rocketmq") | 
				
			||||
 | 
					public class MQConsumerProperties { | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * nameServer | 
				
			||||
 | 
					     */ | 
				
			||||
 | 
					    private String nameServer; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * 最小消费线程数 | 
				
			||||
 | 
					     */ | 
				
			||||
 | 
					    private Integer consumeThreadMin; | 
				
			||||
 | 
					    /** | 
				
			||||
 | 
					     * 最大消费线程数 | 
				
			||||
 | 
					     */ | 
				
			||||
 | 
					    private Integer consumeThreadMax; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					} | 
				
			||||
@ -1,79 +0,0 @@ | 
				
			|||||
package com.epmet.commons.rocketmq.register; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
import lombok.extern.slf4j.Slf4j; | 
					 | 
				
			||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; | 
					 | 
				
			||||
import org.apache.rocketmq.client.exception.MQClientException; | 
					 | 
				
			||||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; | 
					 | 
				
			||||
import org.springframework.beans.factory.annotation.Value; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
import javax.annotation.PostConstruct; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
/** | 
					 | 
				
			||||
 * desc:注册mq监听器 | 
					 | 
				
			||||
 * | 
					 | 
				
			||||
 * @author liujianjun | 
					 | 
				
			||||
 */ | 
					 | 
				
			||||
@Slf4j | 
					 | 
				
			||||
public abstract class MQConsumerRegister { | 
					 | 
				
			||||
    @Value("${spring.profiles.active}") | 
					 | 
				
			||||
    private String env; | 
					 | 
				
			||||
    @Value("${rocketmq.name-server}") | 
					 | 
				
			||||
    private String namesrvAddr; | 
					 | 
				
			||||
 | 
					 | 
				
			||||
    public abstract ConsumerConfigProperties getConsumerProperty(); | 
					 | 
				
			||||
 | 
					 | 
				
			||||
 | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * @return | 
					 | 
				
			||||
     * @Description 注册监听器 | 
					 | 
				
			||||
     * @author wxz | 
					 | 
				
			||||
     * @date 2021.03.03 16:09 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    @PostConstruct | 
					 | 
				
			||||
    public void registerMQListener() { | 
					 | 
				
			||||
        ConsumerConfigProperties consumerProperty = getConsumerProperty(); | 
					 | 
				
			||||
        log.info("registerAllListeners consumers:{} success", consumerProperty); | 
					 | 
				
			||||
        //本地环境不注册
 | 
					 | 
				
			||||
        //if (!"local".equals(env)) {
 | 
					 | 
				
			||||
            try { | 
					 | 
				
			||||
                // 实例化消费者
 | 
					 | 
				
			||||
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerProperty.getConsumerGroup()); | 
					 | 
				
			||||
 | 
					 | 
				
			||||
                // 设置NameServer的地址
 | 
					 | 
				
			||||
                consumer.setNamesrvAddr(namesrvAddr); | 
					 | 
				
			||||
                consumer.setMessageModel(MessageModel.CLUSTERING); | 
					 | 
				
			||||
                consumer.setInstanceName(buildInstanceName()); | 
					 | 
				
			||||
                // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 | 
					 | 
				
			||||
                consumer.subscribe(consumer.getConsumerGroup(), consumerProperty.getTag()); | 
					 | 
				
			||||
                // 注册回调实现类来处理从broker拉取回来的消息
 | 
					 | 
				
			||||
                consumer.registerMessageListener(consumerProperty.getConsumerListener()); | 
					 | 
				
			||||
                consumer.setConsumeThreadMin(consumerProperty.getConsumeThreadMin()); | 
					 | 
				
			||||
                consumer.setConsumeThreadMax(consumerProperty.getConsumeThreadMax()); | 
					 | 
				
			||||
                // 启动消费者实例
 | 
					 | 
				
			||||
                consumer.start(); | 
					 | 
				
			||||
            } catch (MQClientException e) { | 
					 | 
				
			||||
                log.info("registerMQListener exception", e); | 
					 | 
				
			||||
            } | 
					 | 
				
			||||
 | 
					 | 
				
			||||
        //}
 | 
					 | 
				
			||||
 | 
					 | 
				
			||||
    } | 
					 | 
				
			||||
 | 
					 | 
				
			||||
    /** | 
					 | 
				
			||||
     * desc: 因为docker-compose部署有问题 所有自己命名 | 
					 | 
				
			||||
     * | 
					 | 
				
			||||
     * @param | 
					 | 
				
			||||
     * @return java.lang.String | 
					 | 
				
			||||
     * @author LiuJanJun | 
					 | 
				
			||||
     * @date 2021/4/30 5:00 下午 | 
					 | 
				
			||||
     */ | 
					 | 
				
			||||
    private String buildInstanceName() { | 
					 | 
				
			||||
        String instanceName = ""; | 
					 | 
				
			||||
        for (int i = 0; i < 4; i++) { | 
					 | 
				
			||||
            int t = (int) (Math.random() * 10); | 
					 | 
				
			||||
            instanceName = instanceName.concat(t + ""); | 
					 | 
				
			||||
        } | 
					 | 
				
			||||
 | 
					 | 
				
			||||
        return instanceName; | 
					 | 
				
			||||
    } | 
					 | 
				
			||||
} | 
					 | 
				
			||||
@ -0,0 +1,35 @@ | 
				
			|||||
 | 
					package com.epmet.commons.rocketmq.register; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import lombok.extern.slf4j.Slf4j; | 
				
			||||
 | 
					import org.springframework.beans.factory.annotation.Autowired; | 
				
			||||
 | 
					import org.springframework.beans.factory.annotation.Value; | 
				
			||||
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | 
				
			||||
 | 
					import org.springframework.boot.context.properties.EnableConfigurationProperties; | 
				
			||||
 | 
					import org.springframework.context.annotation.Configuration; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					import javax.annotation.PostConstruct; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					/** | 
				
			||||
 | 
					 * @author wxz | 
				
			||||
 | 
					 * @Description MQ注册配置类 | 
				
			||||
 | 
					 * @date 2021.07.14 15:36:24 | 
				
			||||
 | 
					 */ | 
				
			||||
 | 
					@Configuration | 
				
			||||
 | 
					@ConditionalOnProperty(prefix = "rocketmq", name = "enable", havingValue = "true", matchIfMissing = false) | 
				
			||||
 | 
					@EnableConfigurationProperties(MQConsumerProperties.class) | 
				
			||||
 | 
					@Slf4j | 
				
			||||
 | 
					public class MQRegisterConfiguration { | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @Value("${spring.profiles.active}") | 
				
			||||
 | 
					    private String env; | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    private MQConsumerProperties consumerProperties; | 
				
			||||
 | 
					    @Autowired | 
				
			||||
 | 
					    private MQAbstractRegister mqRegister; | 
				
			||||
 | 
					
 | 
				
			||||
 | 
					    @PostConstruct | 
				
			||||
 | 
					    public void register() { | 
				
			||||
 | 
					        mqRegister.registerAllListeners(env, consumerProperties); | 
				
			||||
 | 
					        log.info("监听器注册动作执行完毕"); | 
				
			||||
 | 
					    } | 
				
			||||
 | 
					} | 
				
			||||
					Loading…
					
					
				
		Reference in new issue