Browse Source

巡查记录发送消息 到mq

dev_shibei_match
jianjun 4 years ago
parent
commit
e977206c5e
  1. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 2
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 10
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
  4. 61
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -55,4 +55,9 @@ public interface ConsomerGroupConstants {
*/ */
String OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP = "open_data_staff_change_event_listener_group"; String OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP = "open_data_staff_change_event_listener_group";
/**
* 开放的对接数据(中间库) 巡查记录变更事件监听器分组
*/
String OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP = "open_data_patrol_change_event_listener_group";
} }

2
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -40,7 +40,7 @@ public interface TopicConstants {
String STAFF = "staff"; String STAFF = "staff";
/** /**
* 巡查 * 巡查记录
*/ */
String PATROL = "patrol"; String PATROL = "patrol";
} }

10
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java

@ -4,11 +4,10 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants; import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties; import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.feign.EpmetMessageOpenFeignClient;
import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@ -36,6 +35,13 @@ public class RocketMQConsumerRegister extends MQAbstractRegister {
"*", "*",
new OpenDataStaffChangeEventListener()); new OpenDataStaffChangeEventListener());
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.PATROL,
"*",
new OpenDataPatrolChangeEventListener());
// ...其他监听器类似 // ...其他监听器类似
} }
} }

61
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java

@ -0,0 +1,61 @@
package com.epmet.opendata.mq.listener;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @Description 系统对接中间库巡查信息变更监听器
* @author wxz
* @date 2021.10.13 15:21:48
*/
public class OpenDataPatrolChangeEventListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(MessageExt messageExt) {
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String tags = messageExt.getTags();
logger.info("【开放数据事件监听器】-巡查记录信息变更-收到消息内容:{}, 操作:{}", msg, tags);
DistributedLock distributedLock = null;
RLock lock = null;
try {
//distributedLock = SpringContextUtils.getBean(DistributedLock.class);
//lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId),
// 30L, 30L, TimeUnit.SECONDS);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-巡查记录信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-巡查记录信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
} finally {
//distributedLock.unLock(lock);
}
}
}
Loading…
Cancel
Save