3 changed files with 116 additions and 117 deletions
@ -1,49 +1,49 @@ |
|||||
package com.elink.esua.epdc.modules.rocketmq.consumer; |
//package com.elink.esua.epdc.modules.rocketmq.consumer;
|
||||
|
//
|
||||
import com.alibaba.fastjson.JSONObject; |
//import com.alibaba.fastjson.JSONObject;
|
||||
import com.elink.esua.epdc.commons.tools.constant.RocketMqConstant; |
//import com.elink.esua.epdc.commons.tools.constant.RocketMqConstant;
|
||||
import com.elink.esua.epdc.dto.item.ItemDTO; |
//import com.elink.esua.epdc.dto.item.ItemDTO;
|
||||
import com.elink.esua.epdc.modules.item.service.ItemService; |
//import com.elink.esua.epdc.modules.item.service.ItemService;
|
||||
import lombok.extern.slf4j.Slf4j; |
//import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt; |
//import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel; |
//import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener; |
//import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component; |
//import org.springframework.stereotype.Component;
|
||||
|
//
|
||||
/** |
///**
|
||||
* 三大诉求流程延迟处理-监听MQ消息 |
// * 三大诉求流程延迟处理-监听MQ消息
|
||||
* |
// *
|
||||
* @author zhy |
// * @author zhy
|
||||
* @date 2022/10/28 19:45 |
// * @date 2022/10/28 19:45
|
||||
*/ |
// */
|
||||
@Slf4j |
//@Slf4j
|
||||
@Component |
//@Component
|
||||
@RocketMQMessageListener(topic = RocketMqConstant.MQ_TOPIC_EVENT_ITEM_PROCESS, consumerGroup = "${rocketmq.consumer.process}", messageModel = MessageModel.BROADCASTING) |
//@RocketMQMessageListener(topic = RocketMqConstant.MQ_TOPIC_EVENT_ITEM_PROCESS, consumerGroup = "${rocketmq.consumer.process}", messageModel = MessageModel.BROADCASTING)
|
||||
public class ItemProcessModifyConsumer implements RocketMQListener<MessageExt> { |
//public class ItemProcessModifyConsumer implements RocketMQListener<MessageExt> {
|
||||
|
//
|
||||
@Autowired |
// @Autowired
|
||||
private ItemService itemService; |
// private ItemService itemService;
|
||||
|
//
|
||||
|
//
|
||||
@Override |
// @Override
|
||||
public void onMessage(MessageExt messageExt) { |
// public void onMessage(MessageExt messageExt) {
|
||||
log.info("EPDC-EVENTS-SERVER消费消息START:{topic:{}, msgId:{}}", RocketMqConstant.MQ_TOPIC_EVENT_ITEM_PROCESS, messageExt.getMsgId()); |
// log.info("EPDC-EVENTS-SERVER消费消息START:{topic:{}, msgId:{}}", RocketMqConstant.MQ_TOPIC_EVENT_ITEM_PROCESS, messageExt.getMsgId());
|
||||
try { |
// try {
|
||||
String charset = "UTF-8"; |
// String charset = "UTF-8";
|
||||
String body = new String(messageExt.getBody(), charset); |
// String body = new String(messageExt.getBody(), charset);
|
||||
String tag = messageExt.getTags(); |
// String tag = messageExt.getTags();
|
||||
ItemDTO dto = JSONObject.parseObject(body, ItemDTO.class); |
// ItemDTO dto = JSONObject.parseObject(body, ItemDTO.class);
|
||||
if (RocketMqConstant.MQ_TAG_EVENT_ITEM_PROCESS.equals(tag)) { |
// if (RocketMqConstant.MQ_TAG_EVENT_ITEM_PROCESS.equals(tag)) {
|
||||
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>" + dto.getId()); |
// System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>" + dto.getId());
|
||||
log.info(">>>>>>>>>>>>>>>>>>>>>>>" + dto.getId()); |
// log.info(">>>>>>>>>>>>>>>>>>>>>>>" + dto.getId());
|
||||
} |
// }
|
||||
|
//
|
||||
log.info("EPDC-EVENTS-SERVER消费消息END:{topic:{}, msgId:{}, body:{}}", RocketMqConstant.MQ_TOPIC_EVENT_ITEM_PROCESS, messageExt.getMsgId(), body); |
// log.info("EPDC-EVENTS-SERVER消费消息END:{topic:{}, msgId:{}, body:{}}", RocketMqConstant.MQ_TOPIC_EVENT_ITEM_PROCESS, messageExt.getMsgId(), body);
|
||||
} catch (Exception e) { |
// } catch (Exception e) {
|
||||
log.info("EPDC-EVENTS-SERVER消费消息失败:msgId:{}", messageExt.getMsgId()); |
// log.info("EPDC-EVENTS-SERVER消费消息失败:msgId:{}", messageExt.getMsgId());
|
||||
e.printStackTrace(); |
// e.printStackTrace();
|
||||
} |
// }
|
||||
} |
// }
|
||||
} |
//}
|
||||
|
|||||
@ -1,46 +1,46 @@ |
|||||
package com.elink.esua.epdc.modules.rocketmq.producer; |
//package com.elink.esua.epdc.modules.rocketmq.producer;
|
||||
|
//
|
||||
import lombok.extern.slf4j.Slf4j; |
//import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.producer.SendResult; |
//import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.Message; |
//import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
//import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
//import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component; |
//import org.springframework.stereotype.Component;
|
||||
|
//
|
||||
/** |
///**
|
||||
* 三大诉求流程延迟处理-发送MQ消息 |
// * 三大诉求流程延迟处理-发送MQ消息
|
||||
* |
// *
|
||||
* @author zhy |
// * @author zhy
|
||||
* @date 2022/10/28 19:58 |
// * @date 2022/10/28 19:58
|
||||
*/ |
// */
|
||||
@Slf4j |
//@Slf4j
|
||||
@Component |
//@Component
|
||||
public class ItemProcessModifyProducer { |
//public class ItemProcessModifyProducer {
|
||||
|
//
|
||||
@Autowired |
// @Autowired
|
||||
private RocketMQTemplate rocketMQTemplate; |
// private RocketMQTemplate rocketMQTemplate;
|
||||
|
//
|
||||
/** |
// /**
|
||||
* 发送消息 |
// * 发送消息
|
||||
* |
// *
|
||||
* @return void |
// * @return void
|
||||
* @params [topic, tag, keys, body] |
// * @params [topic, tag, keys, body]
|
||||
* @author liuchuang |
// * @author liuchuang
|
||||
* @since 2020/3/6 21:09 |
// * @since 2020/3/6 21:09
|
||||
*/ |
// */
|
||||
public void sendMessage(String topic, String tag, String keys, String body) { |
// public void sendMessage(String topic, String tag, String keys, String body) {
|
||||
Message message = new Message(topic, tag, keys, body.getBytes()); |
// Message message = new Message(topic, tag, keys, body.getBytes());
|
||||
// 18个级别 ("1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";)
|
// // 18个级别 ("1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";)
|
||||
// level == 0,消息为非延迟消息
|
// // level == 0,消息为非延迟消息
|
||||
// 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
|
// // 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
|
||||
// level > maxLevel,则level== maxLevel,例如level==20,延迟2h
|
// // level > maxLevel,则level== maxLevel,例如level==20,延迟2h
|
||||
message.setDelayTimeLevel(1); |
// message.setDelayTimeLevel(1);
|
||||
try { |
// try {
|
||||
SendResult sendResult = rocketMQTemplate.getProducer().send(message); |
// SendResult sendResult = rocketMQTemplate.getProducer().send(message);
|
||||
log.info("EPDC-EVENTS-SERVER发送消息结果:{sendStatus:{}, topic:{}, msgId:{}}", sendResult.getSendStatus(), topic, sendResult.getMsgId()); |
// log.info("EPDC-EVENTS-SERVER发送消息结果:{sendStatus:{}, topic:{}, msgId:{}}", sendResult.getSendStatus(), topic, sendResult.getMsgId());
|
||||
} catch (Exception e) { |
// } catch (Exception e) {
|
||||
log.error("EPDC-EVENTS-SERVER发送消息异常:{topic:{}, tag:{}, keys:{}, body:{}}", topic, tag, keys, body); |
// log.error("EPDC-EVENTS-SERVER发送消息异常:{topic:{}, tag:{}, keys:{}, body:{}}", topic, tag, keys, body);
|
||||
e.printStackTrace(); |
// e.printStackTrace();
|
||||
} |
// }
|
||||
} |
// }
|
||||
} |
//}
|
||||
|
|||||
Loading…
Reference in new issue