diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
index ded5a9aa8d..38ec75d055 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
@@ -22,4 +22,9 @@ public interface ConsomerGroupConstants {
*/
String ISSUE_PROJECT_CATEGORY_TAG = "issue_project_category_tag";
+ /**
+ * 小组成就消费者组
+ */
+ String GROUP_ACHIEVEMENT_COMPONENTS_GROUP = "group_achievement_components_group";
+
}
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
index c3c8369784..d0f7b7f829 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
@@ -8,5 +8,5 @@ public interface TopicConstants {
/**
* 小组成就
*/
- String RESI_GROUP = "group_achievement";
+ String GROUP_ACHIEVEMENT = "group_achievement";
}
diff --git a/epmet-module/resi-group/resi-group-server/pom.xml b/epmet-module/resi-group/resi-group-server/pom.xml
index c4931be9ae..075676c7b6 100644
--- a/epmet-module/resi-group/resi-group-server/pom.xml
+++ b/epmet-module/resi-group/resi-group-server/pom.xml
@@ -191,6 +191,10 @@
SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19
+
+
+ 192.168.1.130:9876;192.168.1.132:9876
+ epmet_message
@@ -243,6 +247,9 @@
SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19
+
+ 192.168.1.130:9876;192.168.1.132:9876
+ epmet_message
@@ -294,6 +301,10 @@
SEC080aac67ff78e79fdaba132aa51e3fb3f6060dec99492feaac82cabf9f8b6a19
+
+
+ 192.168.10.161:9876
+ epmet_message
@@ -342,6 +353,10 @@
SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1
+
+
+ 192.168.11.187:9876;192.168.11.184:9876
+ epmet_message
diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java
index 93980df9cb..818937dcf6 100644
--- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java
+++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/AbstractStatsAchievementService.java
@@ -16,13 +16,13 @@
*/
package com.epmet.modules.group.service;
+
import com.epmet.commons.tools.constant.NumConstant;
+import com.epmet.commons.tools.constant.StrConstant;
import com.epmet.commons.tools.exception.EpmetErrorCode;
import com.epmet.commons.tools.exception.RenException;
import com.epmet.modules.enums.AchievementTypeEnum;
-import com.epmet.modules.group.dao.ResiGroupDao;
import com.epmet.modules.group.entity.ResiGroupAchievementConfigEntity;
-import com.epmet.modules.member.dao.ResiGroupMemberDao;
import com.epmet.modules.support.GroupAchievementUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,10 +41,7 @@ public abstract class AbstractStatsAchievementService {
public ResiGroupAchievementConfigService achievementConfigService;
@Autowired
public ResiGroupAchievementStatsService achievementStatsService;
- @Autowired
- public ResiGroupDao resiGroupDao;
- @Autowired
- private ResiGroupMemberDao resiGroupMemberDao;
+
/**
* desc: 根据成绩类型 分页获取成就配置 按targetValue 升序
@@ -65,37 +62,41 @@ public abstract class AbstractStatsAchievementService {
* @date 2021/4/22 1:36 下午
*/
protected List getAchievemnetConfigPage(int pageNum, int pageSize, String achievementType) {
- return achievementConfigService.selectMoreThanOneByValue(achievementType,(pageNum- NumConstant.ONE)*pageSize,pageSize);
+ return achievementConfigService.selectMoreThanOneByValue(achievementType, (pageNum - NumConstant.ONE) * pageSize, pageSize);
}
/**
- * desc:初始化配置
+ * desc:初始化配置5条
+ *
* @param achievementType
* @return
*/
- protected List initAchievementConfig(String achievementType){
+ protected List initAchievementConfig(String achievementType) {
AchievementTypeEnum anEnum = AchievementTypeEnum.getEnum(achievementType);
- if (anEnum == null){
+ if (anEnum == null) {
throw new RenException(EpmetErrorCode.INTERNAL_VALIDATE_ERROR.getCode());
}
- if (AchievementTypeEnum.MEMBER.getCode().equals(anEnum.getCode())){
+ if (AchievementTypeEnum.MEMBER.getCode().equals(anEnum.getCode())) {
throw new RenException("小组人数都到10000啦?");
}
ResiGroupAchievementConfigEntity lastOne = achievementConfigService.getLastOne(achievementType);
List newConfigList = new ArrayList<>();
Integer nextTargetValue = null;
+ int preValue = lastOne.getTargetValue();
+ String oldName = lastOne.getAchievementName().replace(String.valueOf(lastOne.getTargetValue()), StrConstant.UNDER_LINE);
for (int i = 0; i < 5; i++) {
- nextTargetValue = GroupAchievementUtils.getNextTargetValue(achievementType,lastOne.getTargetValue());
+ nextTargetValue = GroupAchievementUtils.getNextTargetValue(achievementType, preValue);
ResiGroupAchievementConfigEntity next = new ResiGroupAchievementConfigEntity();
next.setCustomerId(lastOne.getCustomerId());
- next.setAchievementName(lastOne.getAchievementName().replace(String.valueOf(lastOne.getTargetValue()), String.valueOf(nextTargetValue)));
- next.setPreValue(lastOne.getTargetValue());
+ next.setAchievementName(oldName.replace(StrConstant.UNDER_LINE, String.valueOf(nextTargetValue)));
+ //todo 把这个字段去掉
+ next.setPreValue(preValue);
+ preValue = nextTargetValue;
next.setTargetValue(nextTargetValue);
next.setAchievementType(lastOne.getAchievementType());
newConfigList.add(next);
}
achievementConfigService.insertBatch(newConfigList);
-
return newConfigList;
}
diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java
index d29b5217d8..17c390d848 100644
--- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java
+++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/StatsAchievementServiceImpl.java
@@ -135,10 +135,6 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService
//改为已实现
if (currentValue >= targetValue) {
isBatch = true;
-// ResiGroupAchievementStatsEntity newStat = ConvertUtils.sourceToTarget(one, ResiGroupAchievementStatsEntity.class);
-// newStat.setCurrentValue(targetValue);
-// newStat.setIsArrive(1);
-// haveArrive.add(newStat);
boolean isContinue;
int pageNum = NumConstant.ONE;
@@ -200,10 +196,10 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService
haveArrive.add(statsEntity);
if (currentValue >= configEntity.getTargetValue()) {
- statsEntity.setIsArrive(1);
+ statsEntity.setIsArrive(NumConstant.ONE);
} else {
//如果没有达到则 放入这一条后 退出
- statsEntity.setIsArrive(0);
+ statsEntity.setIsArrive(NumConstant.ZERO);
isContinue = false;
break;
}
@@ -219,6 +215,4 @@ public class StatsAchievementServiceImpl extends AbstractStatsAchievementService
}
return list;
}
-
-
}
diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java
deleted file mode 100644
index c06e2ca81f..0000000000
--- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/AbstractRocketConsumer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.epmet.mq;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-
-/**
- * desc:
- *
- * @author: LiuJanJun
- * @date: 2021/4/22 10:10 下午
- * @version: 1.0
- */
-@Slf4j
-//@RocketMQMessageListener(topic = "${game.server.config.business-game-message-topic}", consumerGroup = "gateway-message-consumer-group")
-@RocketMQMessageListener(topic = "group_achievement", consumerGroup = "group")
-public class AbstractRocketConsumer implements RocketMQListener {
-
- @Override
- public void onMessage(String s) {
- log.info("receive msg:{}",s);
- }
-}
diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java
new file mode 100644
index 0000000000..0620136433
--- /dev/null
+++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/GroupAchieventConsumerListener.java
@@ -0,0 +1,37 @@
+package com.epmet.mq;
+
+import com.alibaba.fastjson.JSON;
+import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg;
+import com.epmet.modules.group.service.StatsAchievementService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * desc: 小组成就mq监听
+ *
+ * @author: LiuJanJun
+ * @date: 2021/4/22 10:10 下午
+ * @version: 1.0
+ */
+@Slf4j
+//@Component
+//@RocketMQMessageListener(topic = "group_achievement", consumerGroup = ConsomerGroupConstants.GROUP_ACHIEVEMENT_COMPONENTS_GROUP)
+//@Component
+public class GroupAchieventConsumerListener implements RocketMQListener {
+ @Autowired
+ private StatsAchievementService statsAchievementService;
+
+ @Override
+ public void onMessage(GroupAchievementMQMsg msg) {
+ log.info("receive msg:{}", JSON.toJSONString(msg));
+ if (StringUtils.isBlank(msg.getAchievementType()) || StringUtils.isBlank(msg.getCustomerId())
+ || StringUtils.isBlank(msg.getGroupId())) {
+ log.error("consumer fail,msg:{}", msg);
+ return;
+ }
+ Boolean aBoolean = statsAchievementService.calculateAcm(msg.getCustomerId(), msg.getGroupId(), msg.getAchievementType());
+ log.info("consumer msg success,{}", aBoolean);
+ }
+}
diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java
new file mode 100644
index 0000000000..1e63735ad3
--- /dev/null
+++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/GroupAchievementCustomListener.java
@@ -0,0 +1,77 @@
+package com.epmet.mq.type1;
+
+import com.alibaba.fastjson.JSON;
+import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg;
+import com.epmet.commons.tools.distributedlock.DistributedLock;
+import com.epmet.commons.tools.exception.ExceptionUtils;
+import com.epmet.commons.tools.exception.RenException;
+import com.epmet.commons.tools.utils.SpringContextUtils;
+import com.epmet.modules.group.service.StatsAchievementService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.redisson.api.RLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Description 小组成就-监听器
+ * @return
+ * @author wxz
+ * @date 2021.03.03 16:10
+*/
+public class GroupAchievementCustomListener implements MessageListenerConcurrently {
+
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ try {
+ msgs.forEach(this::consumeMessage);
+ } catch (Exception e) {
+ logger.error("consumeMessage fail,msg:{}",e.getMessage());
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ private void consumeMessage(MessageExt messageExt) {
+ String msg = new String(messageExt.getBody());
+ logger.info("receive msg:{}", JSON.toJSONString(msg));
+ GroupAchievementMQMsg msgObj = JSON.parseObject(msg, GroupAchievementMQMsg.class);
+
+ DistributedLock distributedLock = null;
+ RLock lock = null;
+ try {
+ distributedLock = SpringContextUtils.getBean(DistributedLock.class);
+ lock = distributedLock.getLock(String.format("lock:init_customer_org:%s", msgObj.getCustomerId()),
+ 30l, 30l, TimeUnit.SECONDS);
+
+
+
+ if (StringUtils.isBlank(msgObj.getAchievementType()) || StringUtils.isBlank(msgObj.getCustomerId())
+ || StringUtils.isBlank(msgObj.getGroupId())){
+ logger.error("consumer fail,msg:{}",msgObj);
+ return;
+ }
+ Boolean aBoolean = SpringContextUtils.getBean(StatsAchievementService.class).calculateAcm(msgObj.getCustomerId(), msgObj.getGroupId(), msgObj.getAchievementType());
+ logger.info("consumer msg success,{}",aBoolean);
+ } catch (RenException e) {
+ // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
+ logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+ } catch (Exception e) {
+ // 不是我们自己抛出的异常,可以让MQ重试
+ logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+ throw e;
+ } finally {
+ if (distributedLock != null){
+ distributedLock.unLock(lock);
+ }
+ }
+ }
+}
diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java
new file mode 100644
index 0000000000..c15e0ba6b2
--- /dev/null
+++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/mq/type1/RocketMQConsumerRegister.java
@@ -0,0 +1,63 @@
+package com.epmet.mq.type1;
+
+import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
+import com.epmet.commons.rocketmq.constants.TopicConstants;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+@Slf4j
+@Component
+public class RocketMQConsumerRegister {
+
+ @Value("${rocketmq.name-server}")
+ private String nameServer;
+
+ /**
+ * @return
+ * @Description 注册监听器
+ * @author wxz
+ * @date 2021.03.03 16:09
+ */
+ @PostConstruct
+ public void registerAllListeners() {
+ try {
+ register(ConsomerGroupConstants.GROUP_ACHIEVEMENT_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.GROUP_ACHIEVEMENT, "*", new GroupAchievementCustomListener());
+ } catch (MQClientException e) {
+ log.error("registerAllListeners exception", e);
+ }
+ }
+
+ public void register(String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException {
+ // 实例化消费者
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+
+ // 设置NameServer的地址
+ consumer.setNamesrvAddr(nameServer);
+ consumer.setMessageModel(messageModel);
+ consumer.setInstanceName(buildInstanceName());
+ // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
+ consumer.subscribe(topic, subException);
+ // 注册回调实现类来处理从broker拉取回来的消息
+ consumer.registerMessageListener(listener);
+ // 启动消费者实例
+ consumer.start();
+ }
+
+ private String buildInstanceName() {
+ String instanceName = "";
+ for (int i = 0; i < 4; i++) {
+ int t = (int) (Math.random() * 10);
+ instanceName = instanceName.concat(t + "");
+ }
+
+ return instanceName;
+ }
+
+}
diff --git a/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml b/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml
index 35cade577d..165abb2e03 100644
--- a/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml
+++ b/epmet-module/resi-group/resi-group-server/src/main/resources/bootstrap.yml
@@ -157,3 +157,8 @@ shutdown:
graceful:
enable: true #是否开启优雅停机
waitTimeSecs: 30 # 优雅停机等待时间,每超过30秒,打印一次错误日志
+
+rocketmq:
+ name-server: @rocketmq.nameserver@
+ producer:
+ group: @rocketmq.producer.group@