|
|
@ -2,6 +2,8 @@ package com.epmet.mq.listener; |
|
|
|
|
|
|
|
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; |
|
|
|
import com.epmet.commons.rocketmq.constants.TopicConstants; |
|
|
|
import com.epmet.commons.rocketmq.register.MQAbstractRegister; |
|
|
|
import com.epmet.commons.rocketmq.register.MQConsumerProperties; |
|
|
|
import com.epmet.commons.tools.enums.EnvEnum; |
|
|
|
import com.epmet.mq.listener.listener.AuthOperationLogListener; |
|
|
|
import com.epmet.mq.listener.listener.PointOperationLogListener; |
|
|
@ -16,55 +18,15 @@ import org.springframework.stereotype.Component; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
|
|
|
|
@Component |
|
|
|
public class RocketMQConsumerRegister { |
|
|
|
@Value("${spring.profiles.active}") |
|
|
|
private String env; |
|
|
|
@Value("${rocketmq.name-server}") |
|
|
|
private String nameServer; |
|
|
|
public class RocketMQConsumerRegister extends MQAbstractRegister { |
|
|
|
|
|
|
|
/** |
|
|
|
* @return |
|
|
|
* @Description 注册监听器 |
|
|
|
* @author wxz |
|
|
|
* @date 2021.03.03 16:09 |
|
|
|
*/ |
|
|
|
@PostConstruct |
|
|
|
public void registerAllListeners() { |
|
|
|
try { |
|
|
|
if (!EnvEnum.LOCAL.getCode().equals(env)) { |
|
|
|
register(nameServer, ConsomerGroupConstants.AUTH_OPERATION_LOG_GROUP, MessageModel.CLUSTERING, TopicConstants.AUTH, "*", new AuthOperationLogListener()); |
|
|
|
register(nameServer, ConsomerGroupConstants.PROJECT_OPERATION_LOG_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectOperationLogListener()); |
|
|
|
register(nameServer, ConsomerGroupConstants.POINT_OPERATION_LOG_GROUP, MessageModel.CLUSTERING, TopicConstants.POINT, "*", new PointOperationLogListener()); |
|
|
|
} |
|
|
|
} catch (MQClientException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void register(String nameServer, String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException { |
|
|
|
// 实例化消费者
|
|
|
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); |
|
|
|
|
|
|
|
// 设置NameServer的地址
|
|
|
|
consumer.setNamesrvAddr(nameServer); |
|
|
|
consumer.setMessageModel(messageModel); |
|
|
|
consumer.setInstanceName(buildInstanceName()); |
|
|
|
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
|
|
|
|
consumer.subscribe(topic, subException); |
|
|
|
// 注册回调实现类来处理从broker拉取回来的消息
|
|
|
|
consumer.registerMessageListener(listener); |
|
|
|
// 启动消费者实例
|
|
|
|
consumer.start(); |
|
|
|
} |
|
|
|
|
|
|
|
private String buildInstanceName() { |
|
|
|
String instanceName = ""; |
|
|
|
for (int i = 0; i < 4; i++) { |
|
|
|
int t = (int) (Math.random() * 10); |
|
|
|
instanceName = instanceName.concat(t + ""); |
|
|
|
} |
|
|
|
@Override |
|
|
|
public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { |
|
|
|
// 客户初始化监听器注册
|
|
|
|
register(consumerProperties, ConsomerGroupConstants.AUTH_OPERATION_LOG_GROUP, MessageModel.CLUSTERING, TopicConstants.AUTH, "*", new AuthOperationLogListener()); |
|
|
|
register(consumerProperties, ConsomerGroupConstants.PROJECT_OPERATION_LOG_GROUP, MessageModel.CLUSTERING, TopicConstants.PROJECT_CHANGED, "*", new ProjectOperationLogListener()); |
|
|
|
register(consumerProperties, ConsomerGroupConstants.POINT_OPERATION_LOG_GROUP, MessageModel.CLUSTERING, TopicConstants.POINT, "*", new PointOperationLogListener()); |
|
|
|
|
|
|
|
return instanceName; |
|
|
|
// ...其他监听器类似
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |