diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java index 0e21bbb9fe..0ae6617ce8 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java @@ -241,14 +241,6 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService if (currentValue >= targetValue) { isBatch = true; buildAchievementEntityList(customerId, groupId, achievementType, currentValue, one.getTargetValue(), haveArrive); - -// boolean isContinue; -// int pageNum = NumConstant.ONE; -// int pageSize = NumConstant.TEN; -// do { -// List list = getConfigByType(achievementType, pageNum++, pageSize); -// isContinue = buildArriveList(customerId, groupId, achievementType, currentValue, one.getTargetValue(), haveArrive, list); -// } while (isContinue); } if (isBatch) { @@ -270,7 +262,6 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService * @date 2021/4/23 6:20 下午 */ private void initAchievementStat(String customerId, String groupId, String achievementType, int currentValue, List haveArrive) { - buildAchievementEntityList(customerId, groupId, achievementType, currentValue, null, haveArrive); if (CollectionUtils.isEmpty(haveArrive)) { throw new RenException("小组达成成就失败"); diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java index 41d99c4f4b..cdd0ce615a 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java @@ -6,6 +6,7 @@ import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.modules.group.service.StatsAchievementService; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -24,18 +25,22 @@ import java.util.concurrent.TimeUnit; * @author wxz * @date 2021.03.03 16:10 */ +@Slf4j public class GroupAchievementCustomListener implements MessageListenerConcurrently { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + long start = System.currentTimeMillis(); try { msgs.forEach(this::consumeMessage); } catch (Exception e) { + //失败重发 logger.error("consumeMessage fail,msg:{}",e.getMessage()); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + return ConsumeConcurrentlyStatus.RECONSUME_LATER; } + log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java index c15e0ba6b2..8ef50c1fc0 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java @@ -4,7 +4,7 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; import com.epmet.commons.rocketmq.constants.TopicConstants; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Value; @@ -34,7 +34,7 @@ public class RocketMQConsumerRegister { } } - public void register(String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException { + public void register(String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);