From 22da3840f4b4016d33f6689d1b213dc551ec1658 Mon Sep 17 00:00:00 2001 From: jianjun Date: Fri, 23 Apr 2021 13:37:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=8F=E7=BB=84=E6=88=90=E5=B0=B1=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=B6=88=E8=B4=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/ConsomerGroupConstants.java | 5 ++ .../rocketmq/constants/TopicConstants.java | 2 +- .../resi-group/resi-group-server/pom.xml | 15 ++++ .../AbstractStatsAchievementService.java | 31 ++++---- .../impl/StatsAchievementServiceImpl.java | 10 +-- .../com/epmet/mq/AbstractRocketConsumer.java | 23 ------ .../mq/GroupAchieventConsumerListener.java | 37 +++++++++ .../type1/GroupAchievementCustomListener.java | 77 +++++++++++++++++++ .../mq/type1/RocketMQConsumerRegister.java | 63 +++++++++++++++ .../src/main/resources/bootstrap.yml | 5 ++ 10 files changed, 221 insertions(+), 47 deletions(-) delete mode 100644 epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java create mode 100644 epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java create mode 100644 epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java create mode 100644 epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index ded5a9aa8d..38ec75d055 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -22,4 +22,9 @@ public interface ConsomerGroupConstants { */ String ISSUE_PROJECT_CATEGORY_TAG = "issue_project_category_tag"; + /** + * 小组成就消费者组 + */ + String GROUP_ACHIEVEMENT_COMPONENTS_GROUP = "group_achievement_components_group"; + } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index c3c8369784..d0f7b7f829 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -8,5 +8,5 @@ public interface TopicConstants { /** * 小组成就 */ - String RESI_GROUP = "group_achievement"; + String GROUP_ACHIEVEMENT = "group_achievement"; } diff --git a/epmet-module/resi-group/resi-group-server/pom.xml b/epmet-module/resi-group/resi-group-server/pom.xml index c4931be9ae..075676c7b6 100644 --- a/epmet-module/resi-group/resi-group-server/pom.xml +++ b/epmet-module/resi-group/resi-group-server/pom.xml @@ -191,6 +191,10 @@ SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19 + + + 192.168.1.130:9876;192.168.1.132:9876 + epmet_message @@ -243,6 +247,9 @@ SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19 + + 192.168.1.130:9876;192.168.1.132:9876 + epmet_message @@ -294,6 +301,10 @@ SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19 + + + 192.168.10.161:9876 + epmet_message @@ -342,6 +353,10 @@ SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + + 192.168.11.187:9876;192.168.11.184:9876 + epmet_message diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java index 93980df9cb..818937dcf6 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java @@ -16,13 +16,13 @@ */ package com.epmet.modules.group.service; + import com.epmet.commons.tools.constant.NumConstant; +import com.epmet.commons.tools.constant.StrConstant; import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.RenException; import com.epmet.modules.enums.AchievementTypeEnum; -import com.epmet.modules.group.dao.ResiGroupDao; import com.epmet.modules.group.entity.ResiGroupAchievementConfigEntity; -import com.epmet.modules.member.dao.ResiGroupMemberDao; import com.epmet.modules.support.GroupAchievementUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -41,10 +41,7 @@ public abstract class AbstractStatsAchievementService { public ResiGroupAchievementConfigService achievementConfigService; @Autowired public ResiGroupAchievementStatsService achievementStatsService; - @Autowired - public ResiGroupDao resiGroupDao; - @Autowired - private ResiGroupMemberDao resiGroupMemberDao; + /** * desc: 根据成绩类型 分页获取成就配置 按targetValue 升序 @@ -65,37 +62,41 @@ public abstract class AbstractStatsAchievementService { * @date 2021/4/22 1:36 下午 */ protected List getAchievemnetConfigPage(int pageNum, int pageSize, String achievementType) { - return achievementConfigService.selectMoreThanOneByValue(achievementType,(pageNum- NumConstant.ONE)*pageSize,pageSize); + return achievementConfigService.selectMoreThanOneByValue(achievementType, (pageNum - NumConstant.ONE) * pageSize, pageSize); } /** - * desc:初始化配置 + * desc:初始化配置5条 + * * @param achievementType * @return */ - protected List initAchievementConfig(String achievementType){ + protected List initAchievementConfig(String achievementType) { AchievementTypeEnum anEnum = AchievementTypeEnum.getEnum(achievementType); - if (anEnum == null){ + if (anEnum == null) { throw new RenException(EpmetErrorCode.INTERNAL_VALIDATE_ERROR.getCode()); } - if (AchievementTypeEnum.MEMBER.getCode().equals(anEnum.getCode())){ + if (AchievementTypeEnum.MEMBER.getCode().equals(anEnum.getCode())) { throw new RenException("小组人数都到10000啦?"); } ResiGroupAchievementConfigEntity lastOne = achievementConfigService.getLastOne(achievementType); List newConfigList = new ArrayList<>(); Integer nextTargetValue = null; + int preValue = lastOne.getTargetValue(); + String oldName = lastOne.getAchievementName().replace(String.valueOf(lastOne.getTargetValue()), StrConstant.UNDER_LINE); for (int i = 0; i < 5; i++) { - nextTargetValue = GroupAchievementUtils.getNextTargetValue(achievementType,lastOne.getTargetValue()); + nextTargetValue = GroupAchievementUtils.getNextTargetValue(achievementType, preValue); ResiGroupAchievementConfigEntity next = new ResiGroupAchievementConfigEntity(); next.setCustomerId(lastOne.getCustomerId()); - next.setAchievementName(lastOne.getAchievementName().replace(String.valueOf(lastOne.getTargetValue()), String.valueOf(nextTargetValue))); - next.setPreValue(lastOne.getTargetValue()); + next.setAchievementName(oldName.replace(StrConstant.UNDER_LINE, String.valueOf(nextTargetValue))); + //todo 把这个字段去掉 + next.setPreValue(preValue); + preValue = nextTargetValue; next.setTargetValue(nextTargetValue); next.setAchievementType(lastOne.getAchievementType()); newConfigList.add(next); } achievementConfigService.insertBatch(newConfigList); - return newConfigList; } diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java index d29b5217d8..17c390d848 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java @@ -135,10 +135,6 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService //改为已实现 if (currentValue >= targetValue) { isBatch = true; -// ResiGroupAchievementStatsEntity newStat = ConvertUtils.sourceToTarget(one, ResiGroupAchievementStatsEntity.class); -// newStat.setCurrentValue(targetValue); -// newStat.setIsArrive(1); -// haveArrive.add(newStat); boolean isContinue; int pageNum = NumConstant.ONE; @@ -200,10 +196,10 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService haveArrive.add(statsEntity); if (currentValue >= configEntity.getTargetValue()) { - statsEntity.setIsArrive(1); + statsEntity.setIsArrive(NumConstant.ONE); } else { //如果没有达到则 放入这一条后 退出 - statsEntity.setIsArrive(0); + statsEntity.setIsArrive(NumConstant.ZERO); isContinue = false; break; } @@ -219,6 +215,4 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService } return list; } - - } diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java deleted file mode 100644 index c06e2ca81f..0000000000 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.epmet.mq; - -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; - -/** - * desc: - * - * @author: LiuJanJun - * @date: 2021/4/22 10:10 下午 - * @version: 1.0 - */ -@Slf4j -//@RocketMQMessageListener(topic = "${game.server.config.business-game-message-topic}", consumerGroup = "gateway-message-consumer-group") -@RocketMQMessageListener(topic = "group_achievement", consumerGroup = "group") -public class AbstractRocketConsumer implements RocketMQListener { - - @Override - public void onMessage(String s) { - log.info("receive msg:{}",s); - } -} diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java new file mode 100644 index 0000000000..0620136433 --- /dev/null +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java @@ -0,0 +1,37 @@ +package com.epmet.mq; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; +import com.epmet.modules.group.service.StatsAchievementService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * desc: 小组成就mq监听 + * + * @author: LiuJanJun + * @date: 2021/4/22 10:10 下午 + * @version: 1.0 + */ +@Slf4j +//@Component +//@RocketMQMessageListener(topic = "group_achievement", consumerGroup = ConsomerGroupConstants.GROUP_ACHIEVEMENT_COMPONENTS_GROUP) +//@Component +public class GroupAchieventConsumerListener implements RocketMQListener { + @Autowired + private StatsAchievementService statsAchievementService; + + @Override + public void onMessage(GroupAchievementMQMsg msg) { + log.info("receive msg:{}", JSON.toJSONString(msg)); + if (StringUtils.isBlank(msg.getAchievementType()) || StringUtils.isBlank(msg.getCustomerId()) + || StringUtils.isBlank(msg.getGroupId())) { + log.error("consumer fail,msg:{}", msg); + return; + } + Boolean aBoolean = statsAchievementService.calculateAcm(msg.getCustomerId(), msg.getGroupId(), msg.getAchievementType()); + log.info("consumer msg success,{}", aBoolean); + } +} diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java new file mode 100644 index 0000000000..1e63735ad3 --- /dev/null +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java @@ -0,0 +1,77 @@ +package com.epmet.mq.type1; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.modules.group.service.StatsAchievementService; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @Description 小组成就-监听器 + * @return + * @author wxz + * @date 2021.03.03 16:10 +*/ +public class GroupAchievementCustomListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + try { + msgs.forEach(this::consumeMessage); + } catch (Exception e) { + logger.error("consumeMessage fail,msg:{}",e.getMessage()); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + String msg = new String(messageExt.getBody()); + logger.info("receive msg:{}", JSON.toJSONString(msg)); + GroupAchievementMQMsg msgObj = JSON.parseObject(msg, GroupAchievementMQMsg.class); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:init_customer_org:%s", msgObj.getCustomerId()), + 30l, 30l, TimeUnit.SECONDS); + + + + if (StringUtils.isBlank(msgObj.getAchievementType()) || StringUtils.isBlank(msgObj.getCustomerId()) + || StringUtils.isBlank(msgObj.getGroupId())){ + logger.error("consumer fail,msg:{}",msgObj); + return; + } + Boolean aBoolean = SpringContextUtils.getBean(StatsAchievementService.class).calculateAcm(msgObj.getCustomerId(), msgObj.getGroupId(), msgObj.getAchievementType()); + logger.info("consumer msg success,{}",aBoolean); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + if (distributedLock != null){ + distributedLock.unLock(lock); + } + } + } +} diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..c15e0ba6b2 --- /dev/null +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java @@ -0,0 +1,63 @@ +package com.epmet.mq.type1; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Slf4j +@Component +public class RocketMQConsumerRegister { + + @Value("${rocketmq.name-server}") + private String nameServer; + + /** + * @return + * @Description 注册监听器 + * @author wxz + * @date 2021.03.03 16:09 + */ + @PostConstruct + public void registerAllListeners() { + try { + register(ConsomerGroupConstants.GROUP_ACHIEVEMENT_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.GROUP_ACHIEVEMENT, "*", new GroupAchievementCustomListener()); + } catch (MQClientException e) { + log.error("registerAllListeners exception", e); + } + } + + public void register(String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException { + // 实例化消费者 + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + + // 设置NameServer的地址 + consumer.setNamesrvAddr(nameServer); + consumer.setMessageModel(messageModel); + consumer.setInstanceName(buildInstanceName()); + // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 + consumer.subscribe(topic, subException); + // 注册回调实现类来处理从broker拉取回来的消息 + consumer.registerMessageListener(listener); + // 启动消费者实例 + consumer.start(); + } + + private String buildInstanceName() { + String instanceName = ""; + for (int i = 0; i < 4; i++) { + int t = (int) (Math.random() * 10); + instanceName = instanceName.concat(t + ""); + } + + return instanceName; + } + +} diff --git a/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml b/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml index 35cade577d..165abb2e03 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml +++ b/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml @@ -157,3 +157,8 @@ shutdown: graceful: enable: true #是否开启优雅停机 waitTimeSecs: 30 # 优雅停机等待时间,每超过30秒,打印一次错误日志 + +rocketmq: + name-server: @rocketmq.nameserver@ + producer: + group: @rocketmq.producer.group@