From 87c2ed4945c64bb2c1a44692a1c9012ab9f3c98a Mon Sep 17 00:00:00 2001 From: wangxianzhang Date: Fri, 24 Dec 2021 16:18:03 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E9=99=A4=E5=AF=B9=E6=98=93=E8=81=94?= =?UTF-8?q?=E4=BA=91=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/ConsomerGroupConstants.java | 11 +- .../rocketmq/constants/TopicConstants.java | 10 + .../service/impl/ActLiveRecServiceImpl.java | 22 +- .../impl/VolunteerInfoServiceImpl.java | 18 +- .../service/impl/WorkActServiceImpl.java | 46 ++-- .../com/epmet/constant/SystemMessageType.java | 45 ++++ .../com/epmet/dto/form/SystemMsgFormDTO.java | 4 +- .../impl/SystemMessageServiceImpl.java | 15 ++ .../epmet-point/epmet-point-server/pom.xml | 16 ++ .../epmet/mq/RocketMQConsumerRegister.java | 27 ++ .../com/epmet/mq/listener/PointListener.java | 236 ++++++++++++++++++ .../src/main/resources/bootstrap.yml | 4 + .../GroupMemeberOperationServiceImpl.java | 31 ++- .../impl/ResiGroupMemberServiceImpl.java | 23 +- .../impl/ResiTopicCommentServiceImpl.java | 27 +- .../service/impl/ResiTopicServiceImpl.java | 97 ++++--- .../service/impl/TopicDraftServiceImpl.java | 18 +- 17 files changed, 566 insertions(+), 84 deletions(-) create mode 100644 epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java create mode 100644 epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/listener/PointListener.java diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index 400eca1d51..67bfc91273 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -41,10 +41,19 @@ public interface ConsomerGroupConstants { */ String PROJECT_OPERATION_LOG_GROUP = "project_operation_log_group"; /** - * 积分操作消费组 + * 积分操作日志消费组 */ String POINT_OPERATION_LOG_GROUP = "point_operation_log_group"; + /** + * 爱心互助积分操作消费组 + */ + String EPMET_HEART_POINT_OPERATION_GROUP = "epmet_heart_point_operation_group"; + /** + * 楼院小组积分操作消费组 + */ + String RESI_GROUP_POINT_OPERATION_GROUP = "resi_group_point_operation_group"; + /** * 开放的对接数据(中间库) 组织变更事件监听器分组 */ diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index e11400fef3..bf0c23abab 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -58,4 +58,14 @@ public interface TopicConstants { * 需求完成,如果服务方是区域化党建单位,重新计算这个单位的满意度 */ String CAL_PARTY_UNIT_SATISFACTION = "cal_party_unit_satisfaction"; + + /** + * 爱心互助 + */ + String EPMET_HEART = "epmet_heart"; + + /** + * 楼院小组 + */ + String RESI_GROUP = "resi_group"; } diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/ActLiveRecServiceImpl.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/ActLiveRecServiceImpl.java index c745c98192..3c7d9a6647 100644 --- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/ActLiveRecServiceImpl.java +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/ActLiveRecServiceImpl.java @@ -36,16 +36,19 @@ import com.epmet.commons.tools.security.dto.TokenDto; import com.epmet.commons.tools.utils.Result; import com.epmet.commons.tools.utils.ScanContentUtils; import com.epmet.commons.tools.utils.SendMqMsgUtils; +import com.epmet.constant.SystemMessageType; import com.epmet.dao.ActLivePicDao; import com.epmet.dao.ActLiveRecDao; import com.epmet.dto.ActInfoDTO; import com.epmet.dto.ActLivePicDTO; import com.epmet.dto.ActLiveRecDTO; +import com.epmet.dto.form.SystemMsgFormDTO; import com.epmet.dto.form.resi.ResiActInsertLiveFormDTO; import com.epmet.dto.result.UserBaseInfoResultDTO; import com.epmet.dto.result.resi.ResiActLiveRecResultDTO; import com.epmet.entity.ActLivePicEntity; import com.epmet.entity.ActLiveRecEntity; +import com.epmet.feign.EpmetMessageOpenFeignClient; import com.epmet.feign.EpmetUserOpenFeignClient; import com.epmet.redis.ActLiveRecRedis; import com.epmet.service.ActInfoService; @@ -80,6 +83,9 @@ public class ActLiveRecServiceImpl extends BaseServiceImpl actPointEventMsgList=new ArrayList<>(); BasePointEventMsg actPointEventMsg=new BasePointEventMsg(); actPointEventMsg.setCustomerId(formDTO.getCustomerId()); @@ -242,8 +248,14 @@ public class ActLiveRecServiceImpl extends BaseServiceImpl actPointEventMsgList=new ArrayList<>(); BasePointEventMsg actPointEventMsg=new BasePointEventMsg(); actPointEventMsg.setCustomerId(formDTO.getCustomerId()); @@ -143,8 +145,14 @@ public class VolunteerInfoServiceImpl extends BaseServiceImpl basePointEventMsgArrayList=new ArrayList<>(); for(ActUserRelationEntity actUserRelationEntity:actUserRelationEntityList){ BasePointEventMsg basePointEventMsg=new BasePointEventMsg(); @@ -937,15 +935,22 @@ public class WorkActServiceImpl implements WorkActService { basePointEventMsg.setOperatorId(loginUserUtil.getLoginUserId()); basePointEventMsgArrayList.add(basePointEventMsg); } - mqBaseMsgDTO.setMsg(JSON.toJSONString(basePointEventMsgArrayList)); - Result result=SendMqMsgUtils.sendMsg(mqBaseMsgDTO); + //mqBaseMsgDTO.setMsg(JSON.toJSONString(basePointEventMsgArrayList)); + //Result result=SendMqMsgUtils.sendMsg(mqBaseMsgDTO); + + SystemMsgFormDTO systemMsgFormDTO = new SystemMsgFormDTO(); + systemMsgFormDTO.setMessageType(SystemMessageType.ACTIVE_SEND_POINT); + systemMsgFormDTO.setContent(basePointEventMsgArrayList); + + Result result = epmetMessageOpenFeignClient.sendSystemMsgByMQ(systemMsgFormDTO); + ActGrantPointLogEntity actGrantPointLogEntity=new ActGrantPointLogEntity(); actGrantPointLogEntity.setActId(actInfoDTO.getId()); actGrantPointLogEntity.setCustomerId(actInfoDTO.getCustomerId()); actGrantPointLogEntity.setOperatorId(loginUserUtil.getLoginUserId()); actGrantPointLogEntity.setRemark(remark); actGrantPointLogEntity.setReward(actInfoDTO.getReward()); - actGrantPointLogEntity.setSendMsg(JSON.toJSONString(mqBaseMsgDTO)); + actGrantPointLogEntity.setSendMsg(JSON.toJSONString(systemMsgFormDTO)); actGrantPointLogEntity.setResponseMsg(JSON.toJSONString(result)); actGrantPointLogDao.insert(actGrantPointLogEntity); if(!result.success()){ @@ -958,11 +963,11 @@ public class WorkActServiceImpl implements WorkActService { //查询当前用户所属组织信息 Result userResult = govOrgOpenFeignClient.getAgencyByStaff(loginUserUtil.getLoginUserId()); String opAgencyId = userResult.getData().getId(); - MqBaseMsgDTO mqBaseMsgDTO = new MqBaseMsgDTO(); + //MqBaseMsgDTO mqBaseMsgDTO = new MqBaseMsgDTO(); //mq的事件类型 - mqBaseMsgDTO.setEventClass(EventEnum.ACTIVE_SEND_POINT.getEventClass()); + //mqBaseMsgDTO.setEventClass(EventEnum.ACTIVE_SEND_POINT.getEventClass()); //事件code - mqBaseMsgDTO.setEventTag(EventEnum.ACTIVE_SEND_POINT.getEventTag()); + //mqBaseMsgDTO.setEventTag(EventEnum.ACTIVE_SEND_POINT.getEventTag()); List basePointEventMsgArrayList = new ArrayList<>(); BasePointEventMsg basePointEventMsg = new BasePointEventMsg(); basePointEventMsg.setOpAgencyId(opAgencyId); @@ -976,9 +981,16 @@ public class WorkActServiceImpl implements WorkActService { basePointEventMsg.setSourceId(formDTO.getSourceId()); basePointEventMsg.setOperatorId(loginUserUtil.getLoginUserId()); basePointEventMsgArrayList.add(basePointEventMsg); - mqBaseMsgDTO.setMsg(JSON.toJSONString(basePointEventMsgArrayList)); - logger.info("发送消息入参:"+JSON.toJSON(mqBaseMsgDTO)); - Result result = SendMqMsgUtils.sendMsg(mqBaseMsgDTO); + //mqBaseMsgDTO.setMsg(JSON.toJSONString(basePointEventMsgArrayList)); + //logger.info("发送消息入参:"+JSON.toJSON(mqBaseMsgDTO)); + //Result result = SendMqMsgUtils.sendMsg(mqBaseMsgDTO); + + SystemMsgFormDTO msgForm = new SystemMsgFormDTO(); + msgForm.setMessageType(SystemMessageType.ACTIVE_SEND_POINT); + msgForm.setContent(basePointEventMsgArrayList); + + Result result = epmetMessageOpenFeignClient.sendSystemMsgByMQ(msgForm); + logger.info("发送消息返参:"+JSON.toJSON(result)); ActGrantPointLogEntity actGrantPointLogEntity=new ActGrantPointLogEntity(); actGrantPointLogEntity.setActId(formDTO.getSourceId()); @@ -986,7 +998,7 @@ public class WorkActServiceImpl implements WorkActService { actGrantPointLogEntity.setOperatorId(loginUserUtil.getLoginUserId()); actGrantPointLogEntity.setRemark(formDTO.getRemark()); actGrantPointLogEntity.setReward(formDTO.getReward()); - actGrantPointLogEntity.setSendMsg(JSON.toJSONString(mqBaseMsgDTO)); + actGrantPointLogEntity.setSendMsg(JSON.toJSONString(msgForm)); actGrantPointLogEntity.setResponseMsg(JSON.toJSONString(result)); actGrantPointLogDao.insert(actGrantPointLogEntity); if (!result.success()) { diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java index 284be1cb95..4418c1ebde 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java @@ -104,4 +104,49 @@ public interface SystemMessageType { * 需求完成,如果服务方是区域化党建单位,重新计算这个单位的满意度 */ String CAL_PARTY_UNIT_SATISFACTION = "cal_party_unit_satisfaction"; + + /** + * 活动发放积分 + */ + String ACTIVE_SEND_POINT = "active_send_point"; + /** + * 认证志愿者 + */ + String REGISTER_VOLUNTEER = "register_volunteer"; + /** + * 添加活动实况 + */ + String ACTIVE_INSERT_LIVE = "active_insert_live"; + /** + * 拉新用户入组 + */ + String INVITE_NEW_INTO_GROUP = "invite_new_into_group"; + /** + * 邀请已注册的用户入组 + */ + String INVITE_RESI_INTO_GROUP = "invite_resi_into_group"; + /** + * 发布话题 + */ + String PUBLISH_ONE_TOPIC = "publish_one_topic"; + /** + * 对小组内话题进行15字以上评论 + */ + String PARTICIPATE_ONE_TOPIC = "participate_one_topic"; + /** + * 话题被转为议题(小组中发布的话题被组长转为议题) + */ + String TOPIC_TO_ISSUE = "topic_to_issue"; + /** + * 转话题为议题(将自建小组中话题转为议题) + */ + String SHIFT_TOPIC_TO_ISSUE = "shift_topic_to_issue"; + /** + * 话题被转为项目 + */ + String TOPIC_TO_PROJECT = "topic_to_project"; + /** + * 组长解决组内话题 + */ + String LEADER_RESOLVE_TOPIC = "leader_resolve_topic"; } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java index c0fb1400e9..24dbd8694f 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java @@ -19,6 +19,6 @@ public class SystemMsgFormDTO { @NotNull(message = "消息内容不能为空", groups = { SendMsgByMQ.class }) private Object content; - @NotNull(message = "pendingMsgLabel不能为空", groups = { AckMsgByMQ.class }) - private String pendingMsgLabel; + //@NotNull(message = "pendingMsgLabel不能为空", groups = { AckMsgByMQ.class }) + //private String pendingMsgLabel; } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index 92703db76e..83b6c23e7e 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -195,6 +195,21 @@ public class SystemMessageServiceImpl implements SystemMessageService { case SystemMessageType.CAL_PARTY_UNIT_SATISFACTION: topic=TopicConstants.CAL_PARTY_UNIT_SATISFACTION; break; + case SystemMessageType.INVITE_NEW_INTO_GROUP: + case SystemMessageType.INVITE_RESI_INTO_GROUP: + case SystemMessageType.PUBLISH_ONE_TOPIC: + case SystemMessageType.PARTICIPATE_ONE_TOPIC: + case SystemMessageType.TOPIC_TO_ISSUE: + case SystemMessageType.SHIFT_TOPIC_TO_ISSUE: + case SystemMessageType.TOPIC_TO_PROJECT: + case SystemMessageType.LEADER_RESOLVE_TOPIC: + topic = TopicConstants.RESI_GROUP; + break; + case SystemMessageType.ACTIVE_SEND_POINT: + case SystemMessageType.REGISTER_VOLUNTEER: + case SystemMessageType.ACTIVE_INSERT_LIVE: + topic = TopicConstants.EPMET_HEART; + break; } return topic; } diff --git a/epmet-module/epmet-point/epmet-point-server/pom.xml b/epmet-module/epmet-point/epmet-point-server/pom.xml index 89c84eb878..c0224fd49d 100644 --- a/epmet-module/epmet-point/epmet-point-server/pom.xml +++ b/epmet-module/epmet-point/epmet-point-server/pom.xml @@ -167,6 +167,10 @@ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.1.140:9876;192.168.1.141:9876 @@ -210,6 +214,10 @@ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.1.140:9876;192.168.1.141:9876 @@ -253,6 +261,10 @@ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.10.161:9876 @@ -296,6 +308,10 @@ https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + + true + 192.168.11.187:9876;192.168.11.184:9876 diff --git a/epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..28b4ea9087 --- /dev/null +++ b/epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -0,0 +1,27 @@ +package com.epmet.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.commons.rocketmq.register.MQAbstractRegister; +import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.mq.listener.PointListener; +import com.epmet.service.UserPointActionLogService; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class RocketMQConsumerRegister extends MQAbstractRegister { + + @Autowired + private UserPointActionLogService userPointActionLogService; + + @Override + public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { + // 客户初始化监听器注册 + register(consumerProperties, ConsomerGroupConstants.RESI_GROUP_POINT_OPERATION_GROUP, MessageModel.CLUSTERING, TopicConstants.RESI_GROUP, "*", new PointListener(userPointActionLogService)); + register(consumerProperties, ConsomerGroupConstants.EPMET_HEART_POINT_OPERATION_GROUP, MessageModel.CLUSTERING, TopicConstants.EPMET_HEART, "*", new PointListener(userPointActionLogService)); + + // ...其他监听器类似 + } +} \ No newline at end of file diff --git a/epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/listener/PointListener.java b/epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/listener/PointListener.java new file mode 100644 index 0000000000..d6bc028d24 --- /dev/null +++ b/epmet-module/epmet-point/epmet-point-server/src/main/java/com/epmet/mq/listener/PointListener.java @@ -0,0 +1,236 @@ +package com.epmet.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.LoginMQMsg; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.dto.form.mq.eventmsg.BasePointEventMsg; +import com.epmet.commons.tools.enums.EventEnum; +import com.epmet.commons.tools.exception.EpmetErrorCode; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.constant.SystemMessageType; +import com.epmet.service.UserPointActionLogService; +import dto.form.SendPointFormDTO; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author wxz + * @Description 登录操作日志监听器 + + * @return + * @date 2021.06.07 16:12 + */ +@Slf4j +public class PointListener implements MessageListenerConcurrently { + + private RedisUtils redisUtils; + + private UserPointActionLogService userPointActionLogService; + + public PointListener(UserPointActionLogService userPointActionLogService) { + this.userPointActionLogService = userPointActionLogService; + } + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + log.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + String tags = messageExt.getTags(); + String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); + log.info("【积分操作监听器】-收到消息内容:{}", msg); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock("lock:point_operation", 30L, 30L, TimeUnit.SECONDS); + + switch (tags) { + case SystemMessageType.PARTICIPATE_ONE_TOPIC: + resiGroupPointOpe(msg); + break; + case SystemMessageType.ACTIVE_SEND_POINT: + // 爱心活动积分发放 + activeSendPoint(msg); + break; + case SystemMessageType.ACTIVE_INSERT_LIVE: + // 添加活动实况 + activeInsertLive(msg); + break; + case SystemMessageType.REGISTER_VOLUNTEER: + // 注册志愿者 + registerVolunteer(msg); + break; + case SystemMessageType.TOPIC_TO_ISSUE: + // 话题被组长转为议题 + topicToIssue(msg); + break; + case SystemMessageType.TOPIC_TO_PROJECT: + // 话题转为项目 + topicToProject(msg); + break; + case SystemMessageType.INVITE_RESI_INTO_GROUP: + inviteResiIntoGroup(msg); + break; + case SystemMessageType.INVITE_NEW_INTO_GROUP: + inviteNewIntoGroup(msg); + break; + case SystemMessageType.SHIFT_TOPIC_TO_ISSUE: + shiftTopicToIssue(msg); + break; + case SystemMessageType.PUBLISH_ONE_TOPIC: + publicOneTopic(msg); + break; + case SystemMessageType.LEADER_RESOLVE_TOPIC: + leaderResolveTopic(msg); + break; + } + + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + log.error("【积分操作监听器】消费消息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + log.error("【积分操作监听器】消费消息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + distributedLock.unLock(lock); + } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + log.error("【登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + private void leaderResolveTopic(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + private void publicOneTopic(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + private void shiftTopicToIssue(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + private void inviteNewIntoGroup(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + private void inviteResiIntoGroup(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + private void topicToProject(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + private void topicToIssue(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(), obj); + }); + } + + private void registerVolunteer(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(EventEnum.REGISTER_VOLUNTEER.getEventTag(),obj); + }); + } + + private void activeInsertLive(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(EventEnum.ACTIVE_INSERT_LIVE.getEventTag(), obj); + }); + } + + private void activeSendPoint(String msg) { + List formDTO = JSON.parseArray(msg, SendPointFormDTO.class); + userPointActionLogService.grantPoint(formDTO); + } + + private void resiGroupPointOpe(String msg) { + List formList = JSON.parseArray(msg, BasePointEventMsg.class); + + formList.forEach(obj -> { + userPointActionLogService.grantPointByEvent(obj.getEventTag(),obj); + }); + } + + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + //logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); + } +} diff --git a/epmet-module/epmet-point/epmet-point-server/src/main/resources/bootstrap.yml b/epmet-module/epmet-point/epmet-point-server/src/main/resources/bootstrap.yml index af068df15d..4d5434a25f 100644 --- a/epmet-module/epmet-point/epmet-point-server/src/main/resources/bootstrap.yml +++ b/epmet-module/epmet-point/epmet-point-server/src/main/resources/bootstrap.yml @@ -141,3 +141,7 @@ shutdown: graceful: enable: true #是否开启优雅停机 waitTimeSecs: 30 # 优雅停机等待时间,超过30秒,发出告警 + +rocketmq: + enable: @rocketmq.enable@ + name-server: @rocketmq.nameserver@ \ No newline at end of file diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/member/service/impl/GroupMemeberOperationServiceImpl.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/member/service/impl/GroupMemeberOperationServiceImpl.java index de5c0b5207..648c64fca0 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/member/service/impl/GroupMemeberOperationServiceImpl.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/member/service/impl/GroupMemeberOperationServiceImpl.java @@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.tools.constant.EpmetRoleKeyConstant; import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.MqConstant; @@ -34,12 +35,15 @@ import com.epmet.commons.tools.page.PageData; import com.epmet.commons.tools.utils.ConvertUtils; import com.epmet.commons.tools.utils.Result; import com.epmet.commons.tools.utils.SendMqMsgUtils; +import com.epmet.constant.SystemMessageType; import com.epmet.dto.IssueShareLinkRecordDTO; import com.epmet.dto.IssueShareLinkVisitRecordDTO; import com.epmet.dto.form.CommonGridIdFormDTO; import com.epmet.dto.form.GetRoleKeyListFormDTO; +import com.epmet.dto.form.SystemMsgFormDTO; import com.epmet.dto.result.CommonDataFilterResultDTO; import com.epmet.dto.result.UserBaseInfoResultDTO; +import com.epmet.feign.EpmetMessageOpenFeignClient; import com.epmet.feign.EpmetUserOpenFeignClient; import com.epmet.feign.GovIssueOpenFeignClient; import com.epmet.modules.feign.GovOrgFeignClient; @@ -120,6 +124,8 @@ public class GroupMemeberOperationServiceImpl extends BaseServiceImpl page(Map params) { @@ -352,10 +358,13 @@ public class GroupMemeberOperationServiceImpl extends BaseServiceImpl pointEventMsgList = new ArrayList<>(); BasePointEventMsg pointEventMsg = new BasePointEventMsg(); pointEventMsg.setCustomerId(resiGroupDTO.getCustomerId()); @@ -368,8 +377,8 @@ public class GroupMemeberOperationServiceImpl extends BaseServiceImpl pointEventMsgList = new ArrayList<>(); BasePointEventMsg pointEventMsg = new BasePointEventMsg(); pointEventMsg.setCustomerId(groupCache.getCustomerId()); @@ -689,7 +693,7 @@ public class ResiGroupMemberServiceImpl extends BaseServiceImpl userParam = new LinkedList<>(); @@ -707,8 +711,15 @@ public class ResiGroupMemberServiceImpl extends BaseServiceImpl NumConstant.FIFTEEN){ + String eventClass = EventEnum.PARTICIPATE_ONE_TOPIC.getEventClass(); + String eventTag = EventEnum.PARTICIPATE_ONE_TOPIC.getEventTag(); + //mq的事件类型 - MqBaseMsgDTO mqBaseMsgDTO = new MqBaseMsgDTO(); - mqBaseMsgDTO.setEventClass(EventEnum.PARTICIPATE_ONE_TOPIC.getEventClass()); + //MqBaseMsgDTO mqBaseMsgDTO = new MqBaseMsgDTO(); + //mqBaseMsgDTO.setEventClass(EventEnum.PARTICIPATE_ONE_TOPIC.getEventClass()); //事件code - mqBaseMsgDTO.setEventTag(EventEnum.PARTICIPATE_ONE_TOPIC.getEventTag()); + //mqBaseMsgDTO.setEventTag(EventEnum.PARTICIPATE_ONE_TOPIC.getEventTag()); List pointEventMsgList = new ArrayList<>(); BasePointEventMsg pointEventMsg = new BasePointEventMsg(); String customerId = resiTopicdDao.selectCustomerIdByTopicId(resiCommentFormDTO.getTopicId()); @@ -297,8 +304,8 @@ public class ResiTopicCommentServiceImpl extends BaseServiceImpl pointEventMsgList = new ArrayList<>(); BasePointEventMsg pointEventMsg = new BasePointEventMsg(); ResiGroupInfoRedisDTO resiGroupInfoRedisDTO = resiGroupRedis.get(resiTopicPublishFormDTO.getGroupId()); @@ -449,13 +455,19 @@ public class ResiTopicServiceImpl extends BaseServiceImpl pointEventMsgList = new ArrayList<>(); BasePointEventMsg pointEventMsg = new BasePointEventMsg(); @@ -840,8 +852,15 @@ public class ResiTopicServiceImpl extends BaseServiceImpl result = SendMqMsgUtils.sendMsg(mqBaseMsgDTO); + + SystemMsgFormDTO msgFormDTO = new SystemMsgFormDTO(); + msgFormDTO.setContent(pointEventMsgList); + msgFormDTO.setMessageType(SystemMessageType.LEADER_RESOLVE_TOPIC); + Result result = epmetMessageOpenFeignClient.sendSystemMsgByMQ(msgFormDTO); + + if (!result.success()) { log.error("组长解决话题事件发送失败,参数:{}", JSON.toJSONString(closeFormDTO)); } } @@ -1931,15 +1950,22 @@ public class ResiTopicServiceImpl extends BaseServiceImpl { - MqBaseMsgDTO msgDTO = new MqBaseMsgDTO(); - msgDTO.setEventClass("resi_group"); + //MqBaseMsgDTO msgDTO = new MqBaseMsgDTO(); + //msgDTO.setEventClass("resi_group"); //事件code 因为楼院小组的事件回调统一入口,外层的EventTag不能够判断具体的事件 //因为可能会一次发送多个事件集合,需要对集合里的对象逐个判断 - msgDTO.setEventTag(EventEnum.SHIFT_TOPIC_TO_ISSUE.getEventTag()); + //msgDTO.setEventTag(EventEnum.SHIFT_TOPIC_TO_ISSUE.getEventTag()); List msgList = new ArrayList<>(); msgList.add(item); - msgDTO.setMsg(JSON.toJSONString(msgList)); - if(!SendMqMsgUtils.sendMsg(msgDTO).success()){ + //msgDTO.setMsg(JSON.toJSONString(msgList)); + + SystemMsgFormDTO msgForm = new SystemMsgFormDTO(); + msgForm.setMessageType(SystemMessageType.SHIFT_TOPIC_TO_ISSUE); + msgForm.setContent(msgList); + Result sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(msgForm); + + //Result sendMsgResult = SendMqMsgUtils.sendMsg(msgDTO); + if(!sendMsgResult.success()){ log.error("话题转议题事件发送失败,参数:{}",JSON.toJSONString(topicTurnIssueFromDTO)); } }); @@ -2104,15 +2130,22 @@ public class ResiTopicServiceImpl extends BaseServiceImpl { - MqBaseMsgDTO msgDTO = new MqBaseMsgDTO(); - msgDTO.setEventClass("resi_group"); + //MqBaseMsgDTO msgDTO = new MqBaseMsgDTO(); + //msgDTO.setEventClass("resi_group"); //事件code 因为楼院小组的事件回调统一入口,外层的EventTag不能够判断具体的事件 //因为可能会一次发送多个事件集合,需要对集合里的对象逐个判断 - msgDTO.setEventTag(EventEnum.SHIFT_TOPIC_TO_ISSUE.getEventTag()); + //msgDTO.setEventTag(EventEnum.SHIFT_TOPIC_TO_ISSUE.getEventTag()); List msgList = new ArrayList<>(); msgList.add(item); - msgDTO.setMsg(JSON.toJSONString(msgList)); - if(!SendMqMsgUtils.sendMsg(msgDTO).success()){ + //msgDTO.setMsg(JSON.toJSONString(msgList)); + + SystemMsgFormDTO sendMsgForm = new SystemMsgFormDTO(); + sendMsgForm.setContent(msgList); + sendMsgForm.setMessageType(SystemMessageType.SHIFT_TOPIC_TO_ISSUE); + Result sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(sendMsgForm); + + //Result sendMsgResult = SendMqMsgUtils.sendMsg(msgDTO); + if(!sendMsgResult.success()){ log.error("话题转议题事件发送失败,参数:{}",JSON.toJSONString(param)); } }); @@ -2462,10 +2495,10 @@ public class ResiTopicServiceImpl extends BaseServiceImpl pointEventMsgList = new ArrayList<>(); //话题被转为项目 话题作者 @@ -2493,8 +2526,16 @@ public class ResiTopicServiceImpl extends BaseServiceImpl content.length() ? "\"" : "…\"") ).append("被转为项目").toString()); pointEventMsgList.add(pointEventMsg); - mqBaseMsgDTO.setMsg(JSON.toJSONString(pointEventMsgList)); - if(!SendMqMsgUtils.sendMsg(mqBaseMsgDTO).success()){ + //mqBaseMsgDTO.setMsg(JSON.toJSONString(pointEventMsgList)); + + //Result sendMsgResult = SendMqMsgUtils.sendMsg(mqBaseMsgDTO); + + SystemMsgFormDTO sendMsgForm = new SystemMsgFormDTO(); + sendMsgForm.setContent(pointEventMsgList); + sendMsgForm.setMessageType(SystemMessageType.TOPIC_TO_PROJECT); + Result sendMsgResult = epmetMessageOpenFeignClient.sendSystemMsgByMQ(sendMsgForm); + + if(!sendMsgResult.success()){ log.error("话题转议题事件发送失败,参数:{}",JSON.toJSONString(param)); } } diff --git a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/topic/service/impl/TopicDraftServiceImpl.java b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/topic/service/impl/TopicDraftServiceImpl.java index 9a60f060b2..0d681d7368 100644 --- a/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/topic/service/impl/TopicDraftServiceImpl.java +++ b/epmet-module/resi-group/resi-group-server/src/main/java/com/epmet/modules/topic/service/impl/TopicDraftServiceImpl.java @@ -38,7 +38,9 @@ import com.epmet.commons.tools.scan.result.SyncScanResult; import com.epmet.commons.tools.scan.result.VoiceResultDTO; import com.epmet.commons.tools.security.dto.TokenDto; import com.epmet.commons.tools.utils.*; +import com.epmet.constant.SystemMessageType; import com.epmet.dto.form.CommonGridIdFormDTO; +import com.epmet.dto.form.SystemMsgFormDTO; import com.epmet.dto.result.AllGridsByUserIdResultDTO; import com.epmet.dto.result.UserInfoResultDTO; import com.epmet.dto.result.CommonDataFilterResultDTO; @@ -1004,11 +1006,11 @@ public class TopicDraftServiceImpl extends BaseServiceImpl actPointEventMsgList=new ArrayList<>(); BasePointEventMsg actPointEventMsg=new BasePointEventMsg(); actPointEventMsg.setCustomerId(customerId); @@ -1026,8 +1028,14 @@ public class TopicDraftServiceImpl extends BaseServiceImpl