Browse Source

修改:

1.message/system/send-by-mq接口,增加参数分组
新增:
1.mq消息ack应答接口
dev_shibei_match
wxz 4 years ago
parent
commit
c10791e8f3
  1. 12
      epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java
  2. 13
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java
  3. 13
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java
  4. 13
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java
  5. 5
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/fallback/EpmetMessageOpenFeignClientFallback.java
  6. 25
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java
  7. 19
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java
  8. 39
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  9. 8
      epmet-module/open-data-worker/open-data-worker-server/pom.xml
  10. 4
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/OpenDataApplication.java
  11. 2
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
  12. 44
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
  13. 46
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

12
epmet-commons/epmet-commons-tools/src/main/java/com/epmet/commons/tools/redis/RedisKeys.java

@ -547,4 +547,16 @@ public class RedisKeys {
public static String getQuestionnaireAccessKey(String userId, String qKey) { public static String getQuestionnaireAccessKey(String userId, String qKey) {
return rootPrefix.concat("questionnaire:accesskey:").concat(userId).concat(":").concat(qKey); return rootPrefix.concat("questionnaire:accesskey:").concat(userId).concat(":").concat(qKey);
} }
/**
* @description 检查message MQ滞留消息
*
* @param pendingMsgLabel 滞留消息的label
* @return
* @author wxz
* @date 2021.10.14 14:33:53
*/
public static String checkPendingMqMsgKey(String pendingMsgLabel) {
return rootPrefix.concat("message:mq:pending:").concat(pendingMsgLabel);
}
} }

13
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/MQUserPropertys.java

@ -0,0 +1,13 @@
package com.epmet.constant;
/**
* @Description MQ用户自定义属性
* @author wxz
* @date 2021.10.14 15:47:03
*/
public interface MQUserPropertys {
//堆积消息label
String PENDING_MSG_LABEL = "pendingMsgLabel";
}

13
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/dto/form/SystemMsgFormDTO.java

@ -7,9 +7,18 @@ import javax.validation.constraints.NotNull;
@Data @Data
public class SystemMsgFormDTO { public class SystemMsgFormDTO {
@NotNull(message = "消息类型不能为空") // 发送mq消息分组
public interface SendMsgByMQ {}
// 应答mq消息
public interface AckMsgByMQ {}
@NotNull(message = "消息类型不能为空", groups = { SendMsgByMQ.class })
private String messageType; private String messageType;
@NotNull(message = "消息内容不能为空") @NotNull(message = "消息内容不能为空", groups = { SendMsgByMQ.class })
private Object content; private Object content;
@NotNull(message = "pendingMsgLabel不能为空", groups = { AckMsgByMQ.class })
private String pendingMsgLabel;
} }

13
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/EpmetMessageOpenFeignClient.java

@ -27,7 +27,7 @@ import java.util.List;
* @author yinzuomei@elink-cn.com * @author yinzuomei@elink-cn.com
* @date 2020/6/4 13:47 * @date 2020/6/4 13:47
*/ */
//@FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallback = EpmetMessageOpenFeignClientFallback.class,url = "http://127.0.0.1:8085") //@FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallbackFactory = EpmetMessageOpenFeignClientFallbackFactory.class, url = "http://127.0.0.1:8085")
@FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallbackFactory = EpmetMessageOpenFeignClientFallbackFactory.class) @FeignClient(name = ServiceConstant.EPMET_MESSAGE_SERVER, fallbackFactory = EpmetMessageOpenFeignClientFallbackFactory.class)
public interface EpmetMessageOpenFeignClient { public interface EpmetMessageOpenFeignClient {
/** /**
@ -107,4 +107,15 @@ public interface EpmetMessageOpenFeignClient {
*/ */
@PostMapping("/message/system/send-by-mq") @PostMapping("/message/system/send-by-mq")
Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form); Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form);
/**
* @description 应答mq消息
*
* @param form
* @return
* @author wxz
* @date 2021.10.14 16:07:17
*/
@PostMapping("/message/system/ack-mq-msg")
Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form);
} }

5
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/feign/fallback/EpmetMessageOpenFeignClientFallback.java

@ -69,4 +69,9 @@ public class EpmetMessageOpenFeignClientFallback implements EpmetMessageOpenFeig
public Result sendSystemMsgByMQ(SystemMsgFormDTO form) { public Result sendSystemMsgByMQ(SystemMsgFormDTO form) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_MESSAGE_SERVER, "sendSystemMsgByMQ", form); return ModuleUtils.feignConError(ServiceConstant.EPMET_MESSAGE_SERVER, "sendSystemMsgByMQ", form);
} }
@Override
public Result ackSystemMsgByMQ(SystemMsgFormDTO form) {
return ModuleUtils.feignConError(ServiceConstant.EPMET_MESSAGE_SERVER, "ackSystemMsgByMQ", form);
}
} }

25
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/controller/SystemMessageController.java

@ -17,11 +17,34 @@ public class SystemMessageController {
@Autowired @Autowired
private SystemMessageService systemMessageService; private SystemMessageService systemMessageService;
/**
* @description 发送mq消息
*
* @param form
* @return
* @author wxz
* @date 2021.10.14 16:07:07
*/
@PostMapping("send-by-mq") @PostMapping("send-by-mq")
public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) { public Result sendSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) {
ValidatorUtils.validateEntity(form); ValidatorUtils.validateEntity(form, SystemMsgFormDTO.SendMsgByMQ.class);
systemMessageService.sendMQMessage(form.getMessageType(), form.getContent()); systemMessageService.sendMQMessage(form.getMessageType(), form.getContent());
return new Result(); return new Result();
} }
/**
* @description 应答mq消息
*
* @param form
* @return
* @author wxz
* @date 2021.10.14 16:07:17
*/
@PostMapping("ack-mq-msg")
public Result ackSystemMsgByMQ(@RequestBody SystemMsgFormDTO form) {
ValidatorUtils.validateEntity(form, SystemMsgFormDTO.AckMsgByMQ.class);
systemMessageService.ackMqMessage(form.getPendingMsgLabel());
return new Result();
}
} }

19
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/SystemMessageService.java

@ -2,6 +2,25 @@ package com.epmet.service;
public interface SystemMessageService { public interface SystemMessageService {
/**
* @description 发送mq消息
*
* @param messageType
* @param content
* @return
* @author wxz
* @date 2021.10.14 15:07:02
*/
void sendMQMessage(String messageType, Object content); void sendMQMessage(String messageType, Object content);
/**
* @description 消息应答
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 15:07:09
*/
void ackMqMessage(String pendingMsgLabel);
} }

39
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -7,11 +7,17 @@ import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.tools.exception.EpmetErrorCode; import com.epmet.commons.tools.exception.EpmetErrorCode;
import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.constant.MQUserPropertys;
import com.epmet.constant.SystemMessageSendApproach; import com.epmet.constant.SystemMessageSendApproach;
import com.epmet.constant.SystemMessageType; import com.epmet.constant.SystemMessageType;
import com.epmet.dao.SystemMessageDao; import com.epmet.dao.SystemMessageDao;
import com.epmet.entity.SystemMessageEntity; import com.epmet.entity.SystemMessageEntity;
import com.epmet.service.SystemMessageService; import com.epmet.service.SystemMessageService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
@ -21,6 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.UUID;
@Service @Service
public class SystemMessageServiceImpl implements SystemMessageService { public class SystemMessageServiceImpl implements SystemMessageService {
@ -32,6 +41,9 @@ public class SystemMessageServiceImpl implements SystemMessageService {
@Autowired @Autowired
private RocketMQTemplate rocketMQTemplate; private RocketMQTemplate rocketMQTemplate;
@Autowired
private RedisUtils redisUtils;
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Override @Override
public void sendMQMessage(String messageType, Object content) { public void sendMQMessage(String messageType, Object content) {
@ -43,13 +55,24 @@ public class SystemMessageServiceImpl implements SystemMessageService {
systemMessageEntity.setContent(contentStr); systemMessageEntity.setContent(contentStr);
systemMessageDao.insert(systemMessageEntity); systemMessageDao.insert(systemMessageEntity);
// 缓存下来,供滞留消息扫描。TTL -1,永不过期
MessageCacheBean mcb = new MessageCacheBean(new Date(), messageType, contentStr);
String pendingMsgLabel = UUID.randomUUID().toString().replace("-", "");
String pendingMsgKey = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel);
redisUtils.set(pendingMsgKey, mcb, -1);
//发送mq消息 //发送mq消息
try { try {
Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); Message meMessage = new Message(getTopicByMsgType(messageType), messageType, contentStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
meMessage.putUserProperty(MQUserPropertys.PENDING_MSG_LABEL, pendingMsgLabel);
rocketMQTemplate.getProducer().send(meMessage); rocketMQTemplate.getProducer().send(meMessage);
} catch (Exception e) { } catch (Exception e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace); logger.error("发送系统消息失败,堆栈信息:{}", errorStackTrace);
// 清理消息缓存
redisUtils.delete(pendingMsgKey);
throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode()); throw new RenException(EpmetErrorCode.SYSTEM_MQ_MSG_SEND_FAIL.getCode());
} }
} }
@ -94,4 +117,20 @@ public class SystemMessageServiceImpl implements SystemMessageService {
} }
return topic; return topic;
} }
@Override
public void ackMqMessage(String pendingMsgLabel) {
String key = RedisKeys.checkPendingMqMsgKey(pendingMsgLabel);
redisUtils.delete(key);
logger.info("【MQ消息应答】pendingMsgLabel{}", pendingMsgLabel);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class MessageCacheBean {
private Date createTime;
private String messageType;
private Object content;
}
} }

8
epmet-module/open-data-worker/open-data-worker-server/pom.xml

@ -60,6 +60,12 @@
<artifactId>epmet-commons-rocketmq</artifactId> <artifactId>epmet-commons-rocketmq</artifactId>
<version>2.0.0</version> <version>2.0.0</version>
</dependency> </dependency>
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-message-client</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -166,7 +172,7 @@
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd</dingTalk.robot.secret> <dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd</dingTalk.robot.secret>
<!--rocketmq--> <!--rocketmq-->
<rocketmq.enable>false</rocketmq.enable> <rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver> <rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties> </properties>
</profile> </profile>

4
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java → epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/OpenDataApplication.java

@ -1,4 +1,4 @@
package com.epmet.opendata; package com.epmet;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -18,7 +18,7 @@ import org.springframework.context.annotation.ComponentScan;
@EnableDiscoveryClient @EnableDiscoveryClient
@EnableFeignClients @EnableFeignClients
@ServletComponentScan @ServletComponentScan
@ComponentScan(value = { "com.epmet.opendata", "com.epmet.commons" }) //@ComponentScan(value = { "com.epmet" })
public class OpenDataApplication { public class OpenDataApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(OpenDataApplication.class, args); SpringApplication.run(OpenDataApplication.class, args);

2
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java

@ -4,9 +4,11 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties; import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**

44
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java

@ -3,6 +3,12 @@ package com.epmet.opendata.mq.listener;
import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.MQUserPropertys;
import com.epmet.dto.form.SystemMsgFormDTO;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -22,8 +28,19 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
private Logger logger = LoggerFactory.getLogger(getClass()); private Logger logger = LoggerFactory.getLogger(getClass());
private EpmetMessageOpenFeignClient messageOpenFeignClient;
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (messageOpenFeignClient == null) {
try {
messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class);
} catch (Exception e) {
e.printStackTrace();
}
}
try { try {
msgs.forEach(msg -> consumeMessage(msg)); msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) { } catch (Exception e) {
@ -38,6 +55,7 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
@ -57,5 +75,31 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
} finally { } finally {
//distributedLock.unLock(lock); //distributedLock.unLock(lock);
} }
// 应答mq消息(调用message服务的应答api)
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
ackMqMsg(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description 应答mq消息
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void ackMqMsg(String pendingMsgLabel) {
SystemMsgFormDTO form = new SystemMsgFormDTO();
form.setPendingMsgLabel(pendingMsgLabel);
Result result = messageOpenFeignClient.ackSystemMsgByMQ(form);
if (!result.success()) {
logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg());
}
} }
} }

46
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

@ -3,6 +3,12 @@ package com.epmet.opendata.mq.listener;
import com.epmet.commons.tools.distributedlock.DistributedLock; import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils; import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.exception.RenException;
import com.epmet.commons.tools.utils.Result;
import com.epmet.commons.tools.utils.SpringContextUtils;
import com.epmet.constant.MQUserPropertys;
import com.epmet.dto.form.SystemMsgFormDTO;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
@ -11,6 +17,7 @@ import org.redisson.api.RLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import java.util.List; import java.util.List;
/** /**
@ -22,8 +29,15 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
private Logger logger = LoggerFactory.getLogger(getClass()); private Logger logger = LoggerFactory.getLogger(getClass());
private EpmetMessageOpenFeignClient messageOpenFeignClient;
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (messageOpenFeignClient == null) {
messageOpenFeignClient = SpringContextUtils.getBean(EpmetMessageOpenFeignClient.class);
}
try { try {
msgs.forEach(msg -> consumeMessage(msg)); msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) { } catch (Exception e) {
@ -38,8 +52,11 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody()); String msg = new String(messageExt.getBody());
String tags = messageExt.getTags(); String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL);
//messageExt.propert
logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}", msg, tags); logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel);
DistributedLock distributedLock = null; DistributedLock distributedLock = null;
RLock lock = null; RLock lock = null;
@ -47,6 +64,7 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
//distributedLock = SpringContextUtils.getBean(DistributedLock.class); //distributedLock = SpringContextUtils.getBean(DistributedLock.class);
//lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId), //lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId),
// 30L, 30L, TimeUnit.SECONDS); // 30L, 30L, TimeUnit.SECONDS);
} catch (RenException e) { } catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
@ -57,5 +75,31 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
} finally { } finally {
//distributedLock.unLock(lock); //distributedLock.unLock(lock);
} }
// 应答mq消息(调用message服务的应答api)
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
ackMqMsg(pendingMsgLabel);
} catch (Exception e) {
logger.error("【开放数据事件监听器】-应答mq消息失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
/**
* @description 应答mq消息
*
* @param pendingMsgLabel
* @return
* @author wxz
* @date 2021.10.14 16:32:32
*/
private void ackMqMsg(String pendingMsgLabel) {
SystemMsgFormDTO form = new SystemMsgFormDTO();
form.setPendingMsgLabel(pendingMsgLabel);
Result result = messageOpenFeignClient.ackSystemMsgByMQ(form);
if (!result.success()) {
logger.error("调用Message服务应答MQ消息失败:{}", result.getMsg());
}
} }
} }

Loading…
Cancel
Save