Browse Source

消息消费失败重试

dev_shibei_match
jianjun 4 years ago
parent
commit
6f899d3203
  1. 9
      epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java
  2. 7
      epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java
  3. 4
      epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java

9
epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java

@ -241,14 +241,6 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService
if (currentValue >= targetValue) {
isBatch = true;
buildAchievementEntityList(customerId, groupId, achievementType, currentValue, one.getTargetValue(), haveArrive);
// boolean isContinue;
// int pageNum = NumConstant.ONE;
// int pageSize = NumConstant.TEN;
// do {
// List<ResiGroupAchievementConfigEntity> list = getConfigByType(achievementType, pageNum++, pageSize);
// isContinue = buildArriveList(customerId, groupId, achievementType, currentValue, one.getTargetValue(), haveArrive, list);
// } while (isContinue);
}
if (isBatch) {
@ -270,7 +262,6 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService
* @date 2021/4/23 6:20 下午
*/
private void initAchievementStat(String customerId, String groupId, String achievementType, int currentValue, List<ResiGroupAchievementStatsEntity> haveArrive) {
buildAchievementEntityList(customerId, groupId, achievementType, currentValue, null, haveArrive);
if (CollectionUtils.isEmpty(haveArrive)) {
throw new RenException("小组达成成就失败");

7
epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java

@ -6,6 +6,7 @@ import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.modules.group.service.StatsAchievementService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@ -24,18 +25,22 @@ import java.util.concurrent.TimeUnit;
* @author wxz
* @date 2021.03.03 16:10
*/
@Slf4j
public class GroupAchievementCustomListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long start = System.currentTimeMillis();
try {
msgs.forEach(this::consumeMessage);
} catch (Exception e) {
//失败重发
logger.error("consumeMessage fail,msg:{}",e.getMessage());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
log.info("consumeMessage success, cost:{} ms",System.currentTimeMillis() - start);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

4
epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java

@ -4,7 +4,7 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
@ -34,7 +34,7 @@ public class RocketMQConsumerRegister {
}
}
public void register(String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException {
public void register(String group, MessageModel messageModel, String topic, String subException, MessageListenerConcurrently listener) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);

Loading…
Cancel
Save