How to try it? just uncomment the annotation declaration, then compile
+ * and run the consumer, it will fail to start.
+ */
+
+//@RocketMQTransactionListener
+public class Checker implements TransactionListener {
+ @Override
+ public LocalTransactionState executeLocalTransaction(Message message, Object o) {
+ return null;
+ }
+
+ @Override
+ public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
+ return null;
+ }
+}
diff --git a/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
new file mode 100644
index 000000000..59ef26306
--- /dev/null
+++ b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/MessageExtConsumer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * MessageExtConsumer, consume listener impl class.
+ */
+@Service
+@RocketMQMessageListener(topic = "message-ext-topic", selectorExpression = "tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
+public class MessageExtConsumer implements RocketMQListener Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. Note: The annotation is used only on RocketMQ client producer side, it can not be used
+ * on consumer side.
+ */
+@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface RocketMQTransactionListener {
+
+ /**
+ * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
+ * transactional message with the declared txProducerGroup.
+ *
+ * It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
+ */
+ String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;
+
+ /**
+ * Set ExecutorService params -- corePoolSize
+ */
+ int corePoolSize() default 1;
+
+ /**
+ * Set ExecutorService params -- maximumPoolSize
+ */
+ int maximumPoolSize() default 1;
+
+ /**
+ * Set ExecutorService params -- keepAliveTime
+ */
+ long keepAliveTime() default 1000 * 60; //60ms
+
+ /**
+ * Set ExecutorService params -- blockingQueueSize
+ */
+ int blockingQueueSize() default 2000;
+
+ /**
+ * The property of "access-key"
+ */
+ String accessKey() default "${rocketmq.producer.access-key}";
+
+ /**
+ * The property of "secret-key"
+ */
+ String secretKey() default "${rocketmq.producer.secret-key}";
+}
diff --git a/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/SelectorType.java b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/SelectorType.java
new file mode 100644
index 000000000..97e8b7e78
--- /dev/null
+++ b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/SelectorType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.annotation;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+
+public enum SelectorType {
+
+ /**
+ * @see ExpressionType#TAG
+ */
+ TAG,
+
+ /**
+ * @see ExpressionType#SQL92
+ */
+ SQL92
+}
diff --git a/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
new file mode 100644
index 000000000..192bfc989
--- /dev/null
+++ b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.autoconfigure;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.beans.factory.support.BeanDefinitionValidationException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.core.env.StandardEnvironment;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+import java.util.Objects;
+
+
+@Configuration
+public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
+ private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
+
+ private ConfigurableApplicationContext applicationContext;
+
+ private StandardEnvironment environment;
+
+ private RocketMQProperties rocketMQProperties;
+
+ private ObjectMapper objectMapper;
+
+ public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
+ StandardEnvironment environment,
+ RocketMQProperties rocketMQProperties) {
+ this.objectMapper = rocketMQMessageObjectMapper;
+ this.environment = environment;
+ this.rocketMQProperties = rocketMQProperties;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+ Map Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. Send message in synchronous mode. This method returns only when the sending procedure totally completes.
+ * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
+ * notification, SMS marketing system, etc..
+ * Warn: this method has internal retry-mechanism, that is, internal implementation will retry
+ * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple
+ * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential
+ * duplication issue.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ * @return {@link SendResult}
+ */
+ public SendResult syncSend(String destination, Message> message) {
+ return syncSend(destination, message, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ * @param timeout send timeout with millis
+ * @return {@link SendResult}
+ */
+ public SendResult syncSend(String destination, Message> message, long timeout) {
+ return syncSend(destination, message, timeout, 0);
+ }
+
+ /**
+ * syncSend batch messages in a given timeout.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link Message}
+ * @param timeout send timeout with millis
+ * @return {@link SendResult}
+ */
+ public SendResult syncSend(String destination, Collection Send message to broker asynchronously. asynchronous transmission is generally used in response time sensitive
+ * business scenarios.
+ * This method returns immediately. On sending completion,
+ * Similar to {@link #syncSend(String, Object)}, internal implementation would potentially retry up to {@link
+ * DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
+ * message duplication and application developers are the one to resolve this potential issue.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ * @param sendCallback {@link SendCallback}
+ */
+ public void asyncSend(String destination, Message> message, SendCallback sendCallback) {
+ asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param sendCallback {@link SendCallback}
+ * @param timeout send timeout with millis
+ */
+ public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
+ Message> message = this.doConvert(payload, null, null);
+ asyncSend(destination, message, sendCallback, timeout);
+ }
+
+ /**
+ * Same to {@link #asyncSend(String, Message, SendCallback)}.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param sendCallback {@link SendCallback}
+ */
+ public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
+ asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
+ * addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param sendCallback {@link SendCallback}
+ * @param timeout send timeout with millis
+ */
+ public void asyncSendOrderly(String destination, Message> message, String hashKey, SendCallback sendCallback,
+ long timeout) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+ charset, destination, message);
+ producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
+ } catch (Exception e) {
+ log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param sendCallback {@link SendCallback}
+ */
+ public void asyncSendOrderly(String destination, Message> message, String hashKey, SendCallback sendCallback) {
+ asyncSendOrderly(destination, message, hashKey, sendCallback, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param sendCallback {@link SendCallback}
+ */
+ public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
+ asyncSendOrderly(destination, payload, hashKey, sendCallback, producer.getSendMsgTimeout());
+ }
+
+ /**
+ * Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param sendCallback {@link SendCallback}
+ * @param timeout send timeout with millis
+ */
+ public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
+ long timeout) {
+ Message> message = this.doConvert(payload, null, null);
+ asyncSendOrderly(destination, message, hashKey, sendCallback, timeout);
+ }
+
+ /**
+ * Similar to UDP, this method won't wait for
+ * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
+ *
+ * One-way transmission is used for cases requiring moderate reliability, such as log collection.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ */
+ public void sendOneWay(String destination, Message> message) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("sendOneWay failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+ charset, destination, message);
+ producer.sendOneway(rocketMsg);
+ } catch (Exception e) {
+ log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Same to {@link #sendOneWay(String, Message)}
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ */
+ public void sendOneWay(String destination, Object payload) {
+ Message> message = this.doConvert(payload, null, null);
+ sendOneWay(destination, message);
+ }
+
+ /**
+ * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ */
+ public void sendOneWayOrderly(String destination, Message> message, String hashKey) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+ }
+
+ try {
+ org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
+ charset, destination, message);
+ producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
+ } catch (Exception e) {
+ log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Same to {@link #sendOneWayOrderly(String, Message, String)}
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ */
+ public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
+ Message> message = this.doConvert(payload, null, null);
+ sendOneWayOrderly(destination, message, hashKey);
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ if (producer != null) {
+ producer.start();
+ }
+ }
+
+ @Override
+ protected void doSend(String destination, Message> message) {
+ SendResult sendResult = syncSend(destination, message);
+ log.debug("send message to `{}` finished. result:{}", destination, sendResult);
+ }
+
+
+
+ @Override
+ protected Message> doConvert(Object payload, Map Note: RocketMQTemplate can release all cached producers when bean destroying, it is not recommended to directly
+ * use this method by user.
+ *
+ * @param txProducerGroup
+ * @throws MessagingException
+ */
+ public void removeTransactionMQProducer(String txProducerGroup) throws MessagingException {
+ txProducerGroup = getTxProducerGroupName(txProducerGroup);
+ if (cache.containsKey(txProducerGroup)) {
+ DefaultMQProducer cachedProducer = cache.get(txProducerGroup);
+ cachedProducer.shutdown();
+ cache.remove(txProducerGroup);
+ }
+ }
+
+ /**
+ * Create and start a transaction MQProducer, this new producer is cached in memory.
+ * Note: This method is invoked internally when processing {@code @RocketMQLocalTransactionListener}, it is not
+ * recommended to directly use this method by user.
+ *
+ * @param txProducerGroup Producer (group) name, unique for each producer
+ * @param transactionListener TransactoinListener impl class
+ * @param executorService Nullable.
+ * @param rpcHook Nullable.
+ * @return true if producer is created and started; false if the named producer already exists in cache.
+ * @throws MessagingException
+ */
+ public boolean createAndStartTransactionMQProducer(String txProducerGroup,
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) throws MessagingException {
+ txProducerGroup = getTxProducerGroupName(txProducerGroup);
+ if (cache.containsKey(txProducerGroup)) {
+ log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup));
+ return false;
+ }
+
+ TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService, rpcHook);
+ try {
+ txProducer.start();
+ cache.put(txProducerGroup, txProducer);
+ } catch (MQClientException e) {
+ throw RocketMQUtil.convert(e);
+ }
+
+ return true;
+ }
+
+ private TransactionMQProducer createTransactionMQProducer(String name,
+ RocketMQLocalTransactionListener transactionListener,
+ ExecutorService executorService, RPCHook rpcHook) {
+ Assert.notNull(producer, "Property 'producer' is required");
+ Assert.notNull(transactionListener, "Parameter 'transactionListener' is required");
+ TransactionMQProducer txProducer;
+ if (Objects.nonNull(rpcHook)) {
+ txProducer = new TransactionMQProducer(name, rpcHook);
+ txProducer.setVipChannelEnabled(false);
+ txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));
+ } else {
+ txProducer = new TransactionMQProducer(name);
+ }
+ txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));
+
+ txProducer.setNamesrvAddr(producer.getNamesrvAddr());
+ if (executorService != null) {
+ txProducer.setExecutorService(executorService);
+ }
+
+ txProducer.setSendMsgTimeout(producer.getSendMsgTimeout());
+ txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed());
+ txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed());
+ txProducer.setMaxMessageSize(producer.getMaxMessageSize());
+ txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch());
+ txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK());
+
+ return txProducer;
+ }
+}
diff --git a/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
new file mode 100644
index 000000000..6a730107c
--- /dev/null
+++ b/esua-epdc/epdc-commons/rocketmq-spring-rocketmq-spring-all/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.aop.framework.AopProxyUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.util.Assert;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Objects;
+
+@SuppressWarnings("WeakerAccess")
+public class DefaultRocketMQListenerContainer implements InitializingBean,
+ RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
+ private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
+
+ private ApplicationContext applicationContext;
+
+ /**
+ * The name of the DefaultRocketMQListenerContainer instance
+ */
+ private String name;
+
+ private long suspendCurrentQueueTimeMillis = 1000;
+
+ /**
+ * Message consume retry strategysendCallback will be executed.
+ *
-1,no retry,put into DLQ directly
0,broker control retry frequency
+ * >0,client control retry frequency.
+ */
+ private int delayLevelWhenNextConsume = 0;
+
+ private String nameServer;
+
+ private AccessChannel accessChannel = AccessChannel.LOCAL;
+
+ private String consumerGroup;
+
+ private String topic;
+
+ private int consumeThreadMax = 64;
+
+ private String charset = "UTF-8";
+
+ private ObjectMapper objectMapper;
+
+ private RocketMQListener rocketMQListener;
+
+ private RocketMQMessageListener rocketMQMessageListener;
+
+ private DefaultMQPushConsumer consumer;
+
+ private Class messageType;
+
+ private boolean running;
+
+ // The following properties came from @RocketMQMessageListener.
+ private ConsumeMode consumeMode;
+ private SelectorType selectorType;
+ private String selectorExpression;
+ private MessageModel messageModel;
+ private long consumeTimeout;
+
+ public long getSuspendCurrentQueueTimeMillis() {
+ return suspendCurrentQueueTimeMillis;
+ }
+
+ public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
+ this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+ }
+
+ public int getDelayLevelWhenNextConsume() {
+ return delayLevelWhenNextConsume;
+ }
+
+ public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
+ this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
+ }
+
+ public String getNameServer() {
+ return nameServer;
+ }
+
+ public void setNameServer(String nameServer) {
+ this.nameServer = nameServer;
+ }
+
+ public AccessChannel getAccessChannel() {
+ return accessChannel;
+ }
+
+ public void setAccessChannel(AccessChannel accessChannel) {
+ this.accessChannel = accessChannel;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getConsumeThreadMax() {
+ return consumeThreadMax;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+
+ public void setCharset(String charset) {
+ this.charset = charset;
+ }
+
+ public ObjectMapper getObjectMapper() {
+ return objectMapper;
+ }
+
+ public void setObjectMapper(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+
+
+ public RocketMQListener getRocketMQListener() {
+ return rocketMQListener;
+ }
+
+ public void setRocketMQListener(RocketMQListener rocketMQListener) {
+ this.rocketMQListener = rocketMQListener;
+ }
+
+ public RocketMQMessageListener getRocketMQMessageListener() {
+ return rocketMQMessageListener;
+ }
+
+ public void setRocketMQMessageListener(RocketMQMessageListener anno) {
+ this.rocketMQMessageListener = anno;
+
+ this.consumeMode = anno.consumeMode();
+ this.consumeThreadMax = anno.consumeThreadMax();
+ this.messageModel = anno.messageModel();
+ this.selectorExpression = anno.selectorExpression();
+ this.selectorType = anno.selectorType();
+ this.consumeTimeout = anno.consumeTimeout();
+ }
+
+ public ConsumeMode getConsumeMode() {
+ return consumeMode;
+ }
+
+ public SelectorType getSelectorType() {
+ return selectorType;
+ }
+
+ public String getSelectorExpression() {
+ return selectorExpression;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public DefaultMQPushConsumer getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(DefaultMQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void setupMessageListener(RocketMQListener rocketMQListener) {
+ this.rocketMQListener = rocketMQListener;
+ }
+
+ @Override
+ public void destroy() {
+ this.setRunning(false);
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
+ log.info("container destroyed, {}", this.toString());
+ }
+
+ @Override
+ public boolean isAutoStartup() {
+ return true;
+ }
+
+ @Override
+ public void stop(Runnable callback) {
+ stop();
+ callback.run();
+ }
+
+ @Override
+ public void start() {
+ if (this.isRunning()) {
+ throw new IllegalStateException("container already running. " + this.toString());
+ }
+
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
+ }
+ this.setRunning(true);
+
+ log.info("running container: {}", this.toString());
+ }
+
+ @Override
+ public void stop() {
+ if (this.isRunning()) {
+ if (Objects.nonNull(consumer)) {
+ consumer.shutdown();
+ }
+ setRunning(false);
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ private void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ @Override
+ public int getPhase() {
+ // Returning Integer.MAX_VALUE only suggests that
+ // we will be the first bean to shutdown and last bean to start
+ return Integer.MAX_VALUE;
+ }
+
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ initRocketMQPushConsumer();
+
+ this.messageType = getMessageType();
+ log.debug("RocketMQ messageType: {}", messageType.getName());
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultRocketMQListenerContainer{" +
+ "consumerGroup='" + consumerGroup + '\'' +
+ ", nameServer='" + nameServer + '\'' +
+ ", topic='" + topic + '\'' +
+ ", consumeMode=" + consumeMode +
+ ", selectorType=" + selectorType +
+ ", selectorExpression='" + selectorExpression + '\'' +
+ ", messageModel=" + messageModel +
+ '}';
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List