diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index c4f43a03b2..b4a31a96d1 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -36,6 +36,11 @@ public interface ConsomerGroupConstants { */ String AUTH_OPERATION_LOG_GROUP = "auth_operation_log_group"; + /** + * 数字社区web端工作人员登录 + */ + String STAFF_LOGIN_LOG_GROUP = "staff_login_log_group"; + /** * 项目操作日志消费组 */ diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java index 86f257d9aa..f05f5e0098 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -5,6 +5,7 @@ import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQConsumerProperties; import com.epmet.mq.listener.InitCustomerOrgRolesListener; +import com.epmet.mq.listener.StaffLoginLogListener; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; @@ -32,5 +33,10 @@ public class RocketMQConsumerRegister extends MQAbstractRegister { new InitCustomerOrgRolesListener()); // ...其他监听器类似 + register(consumerProperties, + ConsomerGroupConstants.STAFF_LOGIN_LOG_GROUP, + MessageModel.CLUSTERING, + TopicConstants.AUTH, "login", + new StaffLoginLogListener()); } } diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/StaffLoginLogListener.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/StaffLoginLogListener.java new file mode 100644 index 0000000000..6f850bf554 --- /dev/null +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/mq/listener/StaffLoginLogListener.java @@ -0,0 +1,110 @@ +package com.epmet.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.LoginMQMsg; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.service.StaffLoginLogService; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author wxz + * @Description 登录操作日志监听器 + + * @return + * @date 2021.06.07 16:12 + */ +public class StaffLoginLogListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private RedisUtils redisUtils; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + String tags = messageExt.getTags(); + // msg示例: + // { + // "userId": "5198ef9e3644c4f49457d2b551a1432e", + // "appId": "数字社区登录", + // "loginTime": "2023-04-04 14:05:37", + // "ip": "219.146.91.110", + // "fromApp": "gov", + // "fromClient": "web" + // } + String msg = new String(messageExt.getBody()); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); + logger.info("工作人员登录操作日志监听器-收到消息内容:{}", msg); + LoginMQMsg msgObj = JSON.parseObject(msg, LoginMQMsg.class); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:staff_login_log:%s:%s", tags, msgObj.getUserId()), + 30L, 30L, TimeUnit.SECONDS); + SpringContextUtils.getBean(StaffLoginLogService.class).saveLog(msgObj.getUserId(),msgObj.getLoginTime()); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【RocketMQ】工作人员登录添加操作日志失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【RocketMQ】工作人员登录添加操作日志失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + distributedLock.unLock(lock); + } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【工作人员登录操作事件监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + //logger.info("【登录操作事件监听器】删除pendingMsgLabel成功:{}", pendingMsgLabel); + } +} diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/StaffLoginLogService.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/StaffLoginLogService.java index a7f3e17e45..4594c9cedd 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/StaffLoginLogService.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/StaffLoginLogService.java @@ -3,6 +3,8 @@ package com.epmet.service; import com.epmet.commons.mybatis.service.BaseService; import com.epmet.entity.StaffLoginLogEntity; +import java.util.Date; + /** * 工作人员登录日志表 * @@ -10,6 +12,11 @@ import com.epmet.entity.StaffLoginLogEntity; * @since v1.0.0 2023-04-04 */ public interface StaffLoginLogService extends BaseService { - + /** + * 登录,插入记录 + * @param staffId + * @param loginTime + */ + void saveLog(String staffId, Date loginTime); } \ No newline at end of file diff --git a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/impl/StaffLoginLogServiceImpl.java b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/impl/StaffLoginLogServiceImpl.java index ac58b38f7c..52bd7a38f1 100644 --- a/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/impl/StaffLoginLogServiceImpl.java +++ b/epmet-module/gov-org/gov-org-server/src/main/java/com/epmet/service/impl/StaffLoginLogServiceImpl.java @@ -1,10 +1,25 @@ package com.epmet.service.impl; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.tools.constant.NumConstant; +import com.epmet.commons.tools.constant.StrConstant; +import com.epmet.commons.tools.dto.result.CustomerStaffInfoCacheResult; +import com.epmet.commons.tools.redis.common.CustomerStaffRedis; +import com.epmet.commons.tools.utils.Result; +import com.epmet.commons.tools.utils.SpringContextUtils; import com.epmet.dao.StaffLoginLogDao; +import com.epmet.dto.CustomerAgencyDTO; +import com.epmet.dto.CustomerStaffDTO; import com.epmet.entity.StaffLoginLogEntity; +import com.epmet.feign.EpmetUserOpenFeignClient; +import com.epmet.service.CustomerAgencyService; import com.epmet.service.StaffLoginLogService; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Date; /** * 工作人员登录日志表 @@ -14,6 +29,47 @@ import org.springframework.stereotype.Service; */ @Service public class StaffLoginLogServiceImpl extends BaseServiceImpl implements StaffLoginLogService { + @Autowired + private EpmetUserOpenFeignClient userOpenFeignClient; + + + /** + * 登录,插入记录 + * + * @param staffId + * @param loginTime + */ + @Transactional(rollbackFor = Exception.class) + @Override + public void saveLog(String staffId, Date loginTime) { + CustomerStaffDTO form = new CustomerStaffDTO(); + form.setUserId(staffId); + Result result = userOpenFeignClient.getCustomerStaffInfoByUserId(form); + if (!result.success() || null == result.getData()) { + return; + } + CustomerStaffInfoCacheResult staffInfoCacheResult = CustomerStaffRedis.getStaffInfo(result.getData().getCustomerId(), staffId); + + StaffLoginLogEntity logEntity = new StaffLoginLogEntity(); + logEntity.setCustomerId(result.getData().getCustomerId()); + logEntity.setStaffId(staffId); + logEntity.setStaffName(staffInfoCacheResult.getRealName()); + logEntity.setMobile(staffInfoCacheResult.getMobile()); + logEntity.setAgencyId(staffInfoCacheResult.getAgencyId()); + CustomerAgencyDTO customerAgencyDTO = SpringContextUtils.getBean(CustomerAgencyService.class).get(staffInfoCacheResult.getAgencyId()); + logEntity.setAgencyLevel(customerAgencyDTO.getLevel()); + logEntity.setPid(customerAgencyDTO.getPid()); + if (StringUtils.isBlank(customerAgencyDTO.getPid()) || NumConstant.ZERO_STR.equals(customerAgencyDTO.getPid())) { + logEntity.setOrgIdPath(customerAgencyDTO.getId()); + } else { + logEntity.setOrgIdPath(customerAgencyDTO.getPids().concat(StrConstant.COLON).concat(customerAgencyDTO.getId())); + } + logEntity.setLoginTime(loginTime); + baseDao.insert(logEntity); + } + + + } \ No newline at end of file