Browse Source
# Conflicts: # epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/group/service/impl/ResiGroupServiceImpl.java # epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/member/dao/ResiGroupMemberDao.java # epmet-module/resi-group/resi-group-server/src/main/resources/mapper/member/ResiGroupMemberDao.xmlmaster
33 changed files with 1017 additions and 153 deletions
@ -1,5 +1,12 @@ |
|||
package com.epmet.commons.rocketmq.constants; |
|||
|
|||
public interface TopicConstants { |
|||
/** |
|||
* 初始化客户 |
|||
*/ |
|||
String INIT_CUSTOMER = "init_customer"; |
|||
/** |
|||
* 小组成就 |
|||
*/ |
|||
String GROUP_ACHIEVEMENT = "group_achievement"; |
|||
} |
|||
|
@ -0,0 +1,27 @@ |
|||
package com.epmet.commons.rocketmq.messages; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
/** |
|||
* desc:小组成就mq消息类 |
|||
* |
|||
* @author LiuJanJun |
|||
* @date 2021/4/22 8:35 下午 |
|||
*/ |
|||
@Data |
|||
@AllArgsConstructor |
|||
public class GroupAchievementMQMsg implements Serializable { |
|||
|
|||
private String customerId; |
|||
|
|||
private String groupId; |
|||
|
|||
/** |
|||
* 成就类型 |
|||
* @see com.epmet.modules.enums.AchievementTypeEnum |
|||
*/ |
|||
private String achievementType; |
|||
} |
@ -0,0 +1,68 @@ |
|||
package com.epmet.send; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; |
|||
import com.epmet.commons.tools.constant.NumConstant; |
|||
import com.epmet.commons.tools.utils.Result; |
|||
import com.epmet.constant.SystemMessageType; |
|||
import com.epmet.dto.form.SystemMsgFormDTO; |
|||
import com.epmet.feign.EpmetMessageOpenFeignClient; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
/** |
|||
* desc: 发送mq消息直接到rocketMq 系统 |
|||
* |
|||
* @author: LiuJanJun |
|||
* @date: 2021/4/23 2:39 下午 |
|||
* @versio: 1.0 |
|||
*/ |
|||
@Slf4j |
|||
public class SendMqMsgUtil { |
|||
private static final SendMqMsgUtil INSTANCE = new SendMqMsgUtil(); |
|||
|
|||
private SendMqMsgUtil() { |
|||
|
|||
} |
|||
|
|||
private EpmetMessageOpenFeignClient epmetMessageOpenFeignClient; |
|||
|
|||
public static SendMqMsgUtil build() { |
|||
return INSTANCE; |
|||
} |
|||
|
|||
public SendMqMsgUtil openFeignClient(EpmetMessageOpenFeignClient epmetMessageOpenFeignClient) { |
|||
this.epmetMessageOpenFeignClient = epmetMessageOpenFeignClient; |
|||
return this; |
|||
} |
|||
|
|||
/** |
|||
* desc: 发送小组成就消息,计算小组成就 |
|||
* |
|||
* @param msgContent |
|||
* @return boolean |
|||
* @author LiuJanJun |
|||
* @date 2021/4/23 3:01 下午 |
|||
* @remark 失败重试1次 |
|||
*/ |
|||
public boolean sendGroupAchievementMqMsg(GroupAchievementMQMsg msgContent) { |
|||
try { |
|||
SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO(); |
|||
systemMsgFormDTO.setMessageType(SystemMessageType.GROUP_ACHIEVEMENT); |
|||
systemMsgFormDTO.setContent(msgContent); |
|||
Result sendMsgResult = null; |
|||
int retryTime = 1; |
|||
do { |
|||
sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO); |
|||
} while ((sendMsgResult == null || !sendMsgResult.success()) && retryTime++ < NumConstant.TWO); |
|||
|
|||
if (sendMsgResult != null && sendMsgResult.success()) { |
|||
return true; |
|||
} |
|||
log.error("发送(小组成就)系统消息到message服务失败:{},msg:{}", JSON.toJSONString(sendMsgResult), JSON.toJSONString(systemMsgFormDTO)); |
|||
} catch (Exception e) { |
|||
log.error("sendMqMsg exception", e); |
|||
} |
|||
return false; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,103 @@ |
|||
/** |
|||
* Copyright 2018 人人开源 https://www.renren.io
|
|||
* <p> |
|||
* This program is free software: you can redistribute it and/or modify |
|||
* it under the terms of the GNU General Public License as published by |
|||
* the Free Software Foundation, either version 3 of the License, or |
|||
* (at your option) any later version. |
|||
* <p> |
|||
* This program is distributed in the hope that it will be useful, |
|||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
* GNU General Public License for more details. |
|||
* <p> |
|||
* You should have received a copy of the GNU General Public License |
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
|
|||
package com.epmet.modules.group.service; |
|||
|
|||
import com.epmet.commons.tools.constant.NumConstant; |
|||
import com.epmet.commons.tools.constant.StrConstant; |
|||
import com.epmet.commons.tools.exception.EpmetErrorCode; |
|||
import com.epmet.commons.tools.exception.RenException; |
|||
import com.epmet.modules.enums.AchievementTypeEnum; |
|||
import com.epmet.modules.group.entity.ResiGroupAchievementConfigEntity; |
|||
import com.epmet.modules.support.GroupAchievementUtils; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* desc:实时计算小组成就完成情况 |
|||
* |
|||
* @author generator generator@elink-cn.com |
|||
* @since v1.0.0 2021-04-19 |
|||
*/ |
|||
public abstract class AbstractStatsAchievementService { |
|||
|
|||
@Autowired |
|||
public ResiGroupAchievementConfigService achievementConfigService; |
|||
@Autowired |
|||
public ResiGroupAchievementStatsService achievementStatsService; |
|||
|
|||
|
|||
/** |
|||
* desc: 根据成绩类型 分页获取成就配置 按targetValue 升序 |
|||
* |
|||
* @param offset |
|||
* @param pageSize |
|||
* @return java.util.List<com.epmet.modules.group.entity.ResiGroupAchievementConfigEntity> |
|||
* @author LiuJanJun |
|||
* @date 2021/4/22 1:36 下午 |
|||
*/ |
|||
/** |
|||
* desc: 根据成绩类型 分页获取成就配置 按targetValue 升序 |
|||
* |
|||
* @param pageNum |
|||
* @param pageSize |
|||
* @return java.util.List<com.epmet.modules.group.entity.ResiGroupAchievementConfigEntity> |
|||
* @author LiuJanJun |
|||
* @date 2021/4/22 1:36 下午 |
|||
*/ |
|||
protected List<ResiGroupAchievementConfigEntity> getAchievemnetConfigPage(int pageNum, int pageSize, String achievementType) { |
|||
return achievementConfigService.selectMoreThanOneByValue(achievementType, (pageNum - NumConstant.ONE) * pageSize, pageSize); |
|||
} |
|||
|
|||
/** |
|||
* desc:初始化配置5条 |
|||
* |
|||
* @param achievementType |
|||
* @return |
|||
*/ |
|||
protected List<ResiGroupAchievementConfigEntity> initAchievementConfig(String achievementType) { |
|||
AchievementTypeEnum anEnum = AchievementTypeEnum.getEnum(achievementType); |
|||
if (anEnum == null) { |
|||
throw new RenException(EpmetErrorCode.INTERNAL_VALIDATE_ERROR.getCode()); |
|||
} |
|||
if (AchievementTypeEnum.MEMBER.getCode().equals(anEnum.getCode())) { |
|||
throw new RenException("小组人数都到10000啦?"); |
|||
} |
|||
ResiGroupAchievementConfigEntity lastOne = achievementConfigService.getLastOne(achievementType); |
|||
List<ResiGroupAchievementConfigEntity> newConfigList = new ArrayList<>(); |
|||
Integer nextTargetValue = null; |
|||
int preValue = lastOne.getTargetValue(); |
|||
String oldName = lastOne.getAchievementName().replace(String.valueOf(lastOne.getTargetValue()), StrConstant.UNDER_LINE); |
|||
for (int i = 0; i < 5; i++) { |
|||
nextTargetValue = GroupAchievementUtils.getNextTargetValue(achievementType, preValue); |
|||
ResiGroupAchievementConfigEntity next = new ResiGroupAchievementConfigEntity(); |
|||
next.setCustomerId(lastOne.getCustomerId()); |
|||
next.setAchievementName(oldName.replace(StrConstant.UNDER_LINE, String.valueOf(nextTargetValue))); |
|||
//todo 把这个字段去掉
|
|||
next.setPreValue(preValue); |
|||
preValue = nextTargetValue; |
|||
next.setTargetValue(nextTargetValue); |
|||
next.setAchievementType(lastOne.getAchievementType()); |
|||
newConfigList.add(next); |
|||
} |
|||
achievementConfigService.insertBatch(newConfigList); |
|||
return newConfigList; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,51 @@ |
|||
/** |
|||
* Copyright 2018 人人开源 https://www.renren.io
|
|||
* <p> |
|||
* This program is free software: you can redistribute it and/or modify |
|||
* it under the terms of the GNU General Public License as published by |
|||
* the Free Software Foundation, either version 3 of the License, or |
|||
* (at your option) any later version. |
|||
* <p> |
|||
* This program is distributed in the hope that it will be useful, |
|||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
* GNU General Public License for more details. |
|||
* <p> |
|||
* You should have received a copy of the GNU General Public License |
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
|
|||
package com.epmet.modules.group.service; |
|||
|
|||
/** |
|||
* desc:实时计算小组成就完成情况 |
|||
* |
|||
* @author generator generator@elink-cn.com |
|||
* @since v1.0.0 2021-04-19 |
|||
*/ |
|||
public interface StatsAchievementService{ |
|||
|
|||
/** |
|||
* desc: 计算小组成就 统一入口 |
|||
* |
|||
* @param achievementType 成就类型 小组人数、话题数、转议题数、小组内问题解决数 |
|||
* @return java.lang.Boolean |
|||
* @author LiuJanJun |
|||
* @date 2021/4/20 |
|||
*/ |
|||
Boolean calculateAcm(String achievementType); |
|||
|
|||
/** |
|||
* desc: 根据小组Id计算小组成就 入口 |
|||
* |
|||
* @param customerId |
|||
* @param groupId |
|||
* @param achievementType 成就类型 小组人数、话题数、转议题数、小组内问题解决数 |
|||
* @return java.lang.Boolean |
|||
* @author LiuJanJun |
|||
* @date 2021/4/20 |
|||
*/ |
|||
Boolean calculateAcm(String customerId, String groupId, String achievementType); |
|||
|
|||
|
|||
} |
@ -0,0 +1,218 @@ |
|||
package com.epmet.modules.group.service.impl; |
|||
|
|||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
|||
import com.epmet.commons.tools.constant.NumConstant; |
|||
import com.epmet.commons.tools.exception.EpmetErrorCode; |
|||
import com.epmet.commons.tools.exception.RenException; |
|||
import com.epmet.modules.enums.AchievementTypeEnum; |
|||
import com.epmet.modules.group.entity.ResiGroupAchievementConfigEntity; |
|||
import com.epmet.modules.group.entity.ResiGroupAchievementStatsEntity; |
|||
import com.epmet.modules.group.service.AbstractStatsAchievementService; |
|||
import com.epmet.modules.group.service.StatsAchievementService; |
|||
import com.epmet.modules.member.dao.ResiGroupMemberDao; |
|||
import com.epmet.modules.topic.dao.ResiTopicDao; |
|||
import com.epmet.modules.topic.entity.ResiTopicEntity; |
|||
import com.epmet.resi.group.constant.TopicConstant; |
|||
import com.epmet.resi.group.dto.group.ResiGroupAchievementStatsDTO; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.util.CollectionUtils; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* desc: |
|||
* |
|||
* @author: LiuJanJun |
|||
* @date: 2021/4/20 5:48 下午 |
|||
* @version: 1.0 |
|||
*/ |
|||
@Slf4j |
|||
@Service |
|||
public class StatsAchievementServiceImpl extends AbstractStatsAchievementService implements StatsAchievementService { |
|||
@Autowired |
|||
private ResiGroupMemberDao resiGroupMemberDao; |
|||
@Autowired |
|||
private ResiTopicDao resiTopicDao; |
|||
|
|||
|
|||
/** |
|||
* desc: 计算小组成就 统一入口 |
|||
* |
|||
* @param achievementType 成就类型 |
|||
* @return java.lang.Boolean |
|||
* @author LiuJanJun |
|||
* @date 2021/4/20 |
|||
*/ |
|||
@Override |
|||
public Boolean calculateAcm(String achievementType) { |
|||
if (StringUtils.isBlank(achievementType)) { |
|||
//计算所有类型的 小组人数、话题数、转议题数、小组内问题解决数
|
|||
} |
|||
return null; |
|||
} |
|||
|
|||
/** |
|||
* desc: 根据小组Id计算小组成就 入口 |
|||
* |
|||
* @param customerId |
|||
* @param groupId |
|||
* @param achievementType 成就类型 小组人数、话题数、转议题数、小组内问题解决数 |
|||
* @return java.lang.Boolean |
|||
* @author LiuJanJun |
|||
* @date 2021/4/20 |
|||
*/ |
|||
@Override |
|||
public Boolean calculateAcm(String customerId, String groupId, String achievementType) { |
|||
AchievementTypeEnum anEnum = AchievementTypeEnum.getEnum(achievementType); |
|||
if (StringUtils.isBlank(customerId) || StringUtils.isBlank(groupId) || anEnum == null) { |
|||
return false; |
|||
} |
|||
Integer currentValue = null; |
|||
switch (anEnum) { |
|||
case TOPIC: |
|||
//查询话题数
|
|||
QueryWrapper<ResiTopicEntity> queryWrapper = new QueryWrapper<>(); |
|||
queryWrapper.lambda().eq(ResiTopicEntity::getCustomerId,customerId) |
|||
.eq(ResiTopicEntity::getGroupId,groupId) |
|||
.eq(ResiTopicEntity::getDelFlag,NumConstant.ZERO); |
|||
currentValue = resiTopicDao.selectCount(queryWrapper); |
|||
break; |
|||
case MEMBER: |
|||
currentValue = resiGroupMemberDao.countMembers(customerId, groupId); |
|||
break; |
|||
case TOISSUE: |
|||
//查询已经转议题的话题数
|
|||
queryWrapper = new QueryWrapper<>(); |
|||
queryWrapper.lambda().eq(ResiTopicEntity::getCustomerId,customerId) |
|||
.eq(ResiTopicEntity::getGroupId,groupId) |
|||
.eq(ResiTopicEntity::getShiftIssue, NumConstant.ONE) |
|||
.eq(ResiTopicEntity::getDelFlag,NumConstant.ZERO); |
|||
currentValue = resiTopicDao.selectCount(queryWrapper); |
|||
break; |
|||
case RESOVLE_TOPIC: |
|||
//查询已经关闭且为已解决的话题数
|
|||
queryWrapper = new QueryWrapper<>(); |
|||
queryWrapper.lambda().eq(ResiTopicEntity::getCustomerId,customerId) |
|||
.eq(ResiTopicEntity::getGroupId,groupId) |
|||
.eq(ResiTopicEntity::getStatus, TopicConstant.CLOSED) |
|||
.eq(ResiTopicEntity::getClosedStatus, TopicConstant.RESOLVED) |
|||
.eq(ResiTopicEntity::getShiftIssue, NumConstant.ONE) |
|||
.eq(ResiTopicEntity::getDelFlag,NumConstant.ZERO); |
|||
currentValue = resiTopicDao.selectCount(queryWrapper); |
|||
break; |
|||
default: |
|||
log.info("calculateAcm error"); |
|||
} |
|||
calculateGroupAchievement(customerId, groupId, currentValue, achievementType); |
|||
return true; |
|||
} |
|||
|
|||
/** |
|||
* desc: 计算小组的人数等级 |
|||
* |
|||
* @param customerId |
|||
* @param groupId |
|||
*/ |
|||
private void calculateGroupAchievement(String customerId, String groupId, int currentValue, String achievementType) { |
|||
|
|||
ResiGroupAchievementStatsEntity one = super.achievementStatsService.selectLastUnAchieved(customerId, groupId, achievementType); |
|||
|
|||
List<ResiGroupAchievementStatsEntity> haveArrive = new ArrayList<>(); |
|||
//如果没有实现的额成就则说明是初始化
|
|||
if (one == null) { |
|||
initAchievementStat(customerId, groupId, achievementType, currentValue, haveArrive); |
|||
super.achievementStatsService.saveOrUpdate(haveArrive,false); |
|||
return; |
|||
} |
|||
|
|||
boolean isBatch = false; |
|||
Integer targetValue = one.getTargetValue(); |
|||
//改为已实现
|
|||
if (currentValue >= targetValue) { |
|||
isBatch = true; |
|||
|
|||
boolean isContinue; |
|||
int pageNum = NumConstant.ONE; |
|||
int pageSize = NumConstant.TEN; |
|||
do { |
|||
List<ResiGroupAchievementConfigEntity> list = getConfigByType(achievementType,pageNum++,pageSize); |
|||
isContinue = buildArriveList(customerId, groupId, achievementType, currentValue, one.getTargetValue(), haveArrive, list); |
|||
}while (isContinue); |
|||
} |
|||
|
|||
if (isBatch) { |
|||
super.achievementStatsService.saveOrUpdate(haveArrive,true); |
|||
} else { |
|||
//未实现 只更新当前值
|
|||
ResiGroupAchievementStatsDTO newStat = new ResiGroupAchievementStatsDTO(); |
|||
newStat.setId(one.getId()); |
|||
newStat.setCurrentValue(currentValue); |
|||
super.achievementStatsService.update(newStat); |
|||
} |
|||
} |
|||
|
|||
private void initAchievementStat(String customerId, String groupId, String achievementType, int currentValue, List<ResiGroupAchievementStatsEntity> haveArrive) { |
|||
|
|||
boolean isContinue; |
|||
int pageNum = NumConstant.ONE; |
|||
int pageSize = NumConstant.TEN; |
|||
do { //没有任何数据说明是初始化
|
|||
List<ResiGroupAchievementConfigEntity> list = getConfigByType(achievementType,pageNum++,pageSize); |
|||
//查询和初始化都失败啦
|
|||
if (CollectionUtils.isEmpty(list)) { |
|||
log.error("calculateMember get config fail"); |
|||
throw new RenException(EpmetErrorCode.INTERNAL_VALIDATE_ERROR.getCode()); |
|||
} |
|||
isContinue = buildArriveList(customerId, groupId, achievementType, currentValue, null, haveArrive, list); |
|||
}while (isContinue); |
|||
if (CollectionUtils.isEmpty(haveArrive)) { |
|||
throw new RenException("小组达成成就失败"); |
|||
} |
|||
} |
|||
|
|||
private boolean buildArriveList(String customerId, String groupId, String achievementType, int currentValue, Integer targetValue, List<ResiGroupAchievementStatsEntity> haveArrive, List<ResiGroupAchievementConfigEntity> list) { |
|||
boolean isContinue = true; |
|||
for (ResiGroupAchievementConfigEntity configEntity : list) { |
|||
if (targetValue != null) { |
|||
//筛选出已经完成的成就
|
|||
if (configEntity.getTargetValue()<targetValue){ |
|||
continue; |
|||
} |
|||
} |
|||
ResiGroupAchievementStatsEntity statsEntity = new ResiGroupAchievementStatsEntity(); |
|||
statsEntity.setCustomerId(customerId); |
|||
statsEntity.setGroupId(groupId); |
|||
statsEntity.setAchievementId(configEntity.getId()); |
|||
statsEntity.setAchievementName(configEntity.getAchievementName()); |
|||
statsEntity.setArriveTime(new Date()); |
|||
statsEntity.setAchievementType(achievementType); |
|||
statsEntity.setCurrentValue(currentValue); |
|||
statsEntity.setTargetValue(configEntity.getTargetValue()); |
|||
|
|||
haveArrive.add(statsEntity); |
|||
if (currentValue >= configEntity.getTargetValue()) { |
|||
statsEntity.setIsArrive(NumConstant.ONE); |
|||
} else { |
|||
//如果没有达到则 放入这一条后 退出
|
|||
statsEntity.setIsArrive(NumConstant.ZERO); |
|||
isContinue = false; |
|||
break; |
|||
} |
|||
} |
|||
return isContinue; |
|||
} |
|||
|
|||
private List<ResiGroupAchievementConfigEntity> getConfigByType(String achievementType,int pageNum,int pageSize) { |
|||
List<ResiGroupAchievementConfigEntity> list = super.getAchievemnetConfigPage(pageNum, pageSize, achievementType); |
|||
if (CollectionUtils.isEmpty(list)) { |
|||
log.warn("获取小组成就配置错误,{} 开始进行初始化规则", achievementType); |
|||
list = super.initAchievementConfig(achievementType); |
|||
} |
|||
return list; |
|||
} |
|||
} |
@ -0,0 +1,37 @@ |
|||
package com.epmet.mq; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; |
|||
import com.epmet.modules.group.service.StatsAchievementService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.apache.rocketmq.spring.core.RocketMQListener; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
|
|||
/** |
|||
* desc: 小组成就mq监听 |
|||
* |
|||
* @author: LiuJanJun |
|||
* @date: 2021/4/22 10:10 下午 |
|||
* @version: 1.0 |
|||
*/ |
|||
@Slf4j |
|||
//@Component
|
|||
//@RocketMQMessageListener(topic = "group_achievement", consumerGroup = ConsomerGroupConstants.GROUP_ACHIEVEMENT_COMPONENTS_GROUP)
|
|||
//@Component
|
|||
public class GroupAchieventConsumerListener implements RocketMQListener<GroupAchievementMQMsg> { |
|||
@Autowired |
|||
private StatsAchievementService statsAchievementService; |
|||
|
|||
@Override |
|||
public void onMessage(GroupAchievementMQMsg msg) { |
|||
log.info("receive msg:{}", JSON.toJSONString(msg)); |
|||
if (StringUtils.isBlank(msg.getAchievementType()) || StringUtils.isBlank(msg.getCustomerId()) |
|||
|| StringUtils.isBlank(msg.getGroupId())) { |
|||
log.error("consumer fail,msg:{}", msg); |
|||
return; |
|||
} |
|||
Boolean aBoolean = statsAchievementService.calculateAcm(msg.getCustomerId(), msg.getGroupId(), msg.getAchievementType()); |
|||
log.info("consumer msg success,{}", aBoolean); |
|||
} |
|||
} |
@ -0,0 +1,77 @@ |
|||
package com.epmet.mq.type1; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.epmet.commons.rocketmq.messages.GroupAchievementMQMsg; |
|||
import com.epmet.commons.tools.distributedlock.DistributedLock; |
|||
import com.epmet.commons.tools.exception.ExceptionUtils; |
|||
import com.epmet.commons.tools.exception.RenException; |
|||
import com.epmet.commons.tools.utils.SpringContextUtils; |
|||
import com.epmet.modules.group.service.StatsAchievementService; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
|||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
|||
import org.apache.rocketmq.common.message.MessageExt; |
|||
import org.redisson.api.RLock; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
/** |
|||
* @Description 小组成就-监听器 |
|||
* @return |
|||
* @author wxz |
|||
* @date 2021.03.03 16:10 |
|||
*/ |
|||
public class GroupAchievementCustomListener implements MessageListenerConcurrently { |
|||
|
|||
private Logger logger = LoggerFactory.getLogger(getClass()); |
|||
|
|||
@Override |
|||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { |
|||
try { |
|||
msgs.forEach(this::consumeMessage); |
|||
} catch (Exception e) { |
|||
logger.error("consumeMessage fail,msg:{}",e.getMessage()); |
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|||
} |
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|||
} |
|||
|
|||
private void consumeMessage(MessageExt messageExt) { |
|||
String msg = new String(messageExt.getBody()); |
|||
logger.info("receive msg:{}", JSON.toJSONString(msg)); |
|||
GroupAchievementMQMsg msgObj = JSON.parseObject(msg, GroupAchievementMQMsg.class); |
|||
|
|||
DistributedLock distributedLock = null; |
|||
RLock lock = null; |
|||
try { |
|||
distributedLock = SpringContextUtils.getBean(DistributedLock.class); |
|||
lock = distributedLock.getLock(String.format("lock:init_customer_org:%s", msgObj.getCustomerId()), |
|||
30l, 30l, TimeUnit.SECONDS); |
|||
|
|||
|
|||
|
|||
if (StringUtils.isBlank(msgObj.getAchievementType()) || StringUtils.isBlank(msgObj.getCustomerId()) |
|||
|| StringUtils.isBlank(msgObj.getGroupId())){ |
|||
logger.error("consumer fail,msg:{}",msgObj); |
|||
return; |
|||
} |
|||
Boolean aBoolean = SpringContextUtils.getBean(StatsAchievementService.class).calculateAcm(msgObj.getCustomerId(), msgObj.getGroupId(), msgObj.getAchievementType()); |
|||
logger.info("consumer msg success,{}",aBoolean); |
|||
} catch (RenException e) { |
|||
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
|
|||
logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); |
|||
} catch (Exception e) { |
|||
// 不是我们自己抛出的异常,可以让MQ重试
|
|||
logger.error("【RocketMQ】初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); |
|||
throw e; |
|||
} finally { |
|||
if (distributedLock != null){ |
|||
distributedLock.unLock(lock); |
|||
} |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,63 @@ |
|||
package com.epmet.mq.type1; |
|||
|
|||
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; |
|||
import com.epmet.commons.rocketmq.constants.TopicConstants; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
|||
import org.apache.rocketmq.client.consumer.listener.MessageListener; |
|||
import org.apache.rocketmq.client.exception.MQClientException; |
|||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class RocketMQConsumerRegister { |
|||
|
|||
@Value("${rocketmq.name-server}") |
|||
private String nameServer; |
|||
|
|||
/** |
|||
* @return |
|||
* @Description 注册监听器 |
|||
* @author wxz |
|||
* @date 2021.03.03 16:09 |
|||
*/ |
|||
@PostConstruct |
|||
public void registerAllListeners() { |
|||
try { |
|||
register(ConsomerGroupConstants.GROUP_ACHIEVEMENT_COMPONENTS_GROUP, MessageModel.CLUSTERING, TopicConstants.GROUP_ACHIEVEMENT, "*", new GroupAchievementCustomListener()); |
|||
} catch (MQClientException e) { |
|||
log.error("registerAllListeners exception", e); |
|||
} |
|||
} |
|||
|
|||
public void register(String group, MessageModel messageModel, String topic, String subException, MessageListener listener) throws MQClientException { |
|||
// 实例化消费者
|
|||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); |
|||
|
|||
// 设置NameServer的地址
|
|||
consumer.setNamesrvAddr(nameServer); |
|||
consumer.setMessageModel(messageModel); |
|||
consumer.setInstanceName(buildInstanceName()); |
|||
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
|
|||
consumer.subscribe(topic, subException); |
|||
// 注册回调实现类来处理从broker拉取回来的消息
|
|||
consumer.registerMessageListener(listener); |
|||
// 启动消费者实例
|
|||
consumer.start(); |
|||
} |
|||
|
|||
private String buildInstanceName() { |
|||
String instanceName = ""; |
|||
for (int i = 0; i < 4; i++) { |
|||
int t = (int) (Math.random() * 10); |
|||
instanceName = instanceName.concat(t + ""); |
|||
} |
|||
|
|||
return instanceName; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.epmet.modules.group.service.impl; |
|||
|
|||
import com.epmet.modules.group.service.StatsAchievementService; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.boot.test.context.SpringBootTest; |
|||
import org.springframework.test.context.junit4.SpringRunner; |
|||
|
|||
@RunWith(SpringRunner.class) |
|||
@SpringBootTest |
|||
public class StatsAchievementServiceImplTest { |
|||
@Autowired |
|||
private StatsAchievementService statsAchievementService; |
|||
|
|||
@Test |
|||
public void calculateAcm() { |
|||
|
|||
// String customerId = "test_cid";
|
|||
// String groupId = "test_groupId";
|
|||
// String achievementType = "member";
|
|||
// Boolean aBoolean = statsAchievementService.calculateAcm(customerId, groupId, achievementType);
|
|||
// System.out.println(aBoolean);
|
|||
|
|||
String customerId = "test_cid"; |
|||
String groupId = "test_groupId"; |
|||
String achievementType = "toissue"; |
|||
Boolean aBoolean = statsAchievementService.calculateAcm(customerId, groupId, achievementType); |
|||
System.out.println(aBoolean); |
|||
} |
|||
} |
Loading…
Reference in new issue