wangxianzhang 4 years ago
parent
commit
30716d597c
  1. 14
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
  2. 9
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
  3. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java
  4. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java
  5. 8
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

14
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java

@ -4,11 +4,13 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataProjectChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
@ -19,35 +21,39 @@ import org.springframework.stereotype.Component;
@Component
public class RocketMQConsumerRegister extends MQAbstractRegister {
@Autowired
private DistributedLock distributedLock;
@Override
public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
// 客户初始化监听器注册
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.ORG,
"*",
new OpenDataOrgChangeEventListener());
new OpenDataOrgChangeEventListener(distributedLock));
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.STAFF,
"*",
new OpenDataStaffChangeEventListener());
new OpenDataStaffChangeEventListener(distributedLock));
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_PATROL_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.PATROL,
"*",
new OpenDataPatrolChangeEventListener());
new OpenDataPatrolChangeEventListener(distributedLock));
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_PROJECT_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.PROJECT,
"*",
new OpenDataProjectChangeEventListener());
new OpenDataProjectChangeEventListener(distributedLock));
// ...其他监听器类似
}

9
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java

@ -35,6 +35,13 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
private RedisUtils redisUtils;
private DistributedLock distributedLock;
public OpenDataOrgChangeEventListener(DistributedLock distributedLock) {
this.distributedLock = distributedLock;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -62,10 +69,8 @@ public class OpenDataOrgChangeEventListener implements MessageListenerConcurrent
logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
OrgOrStaffMQMsg obj = JSON.parseObject(msg, OrgOrStaffMQMsg.class);
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:open_data_org:%s", obj.getOrgId()),
30L, 30L, TimeUnit.SECONDS);

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataPatrolChangeEventListener.java

@ -37,6 +37,12 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
private RedisUtils redisUtils;
private DistributedLock distributedLock;
public OpenDataPatrolChangeEventListener(DistributedLock distributedLock) {
this.distributedLock = distributedLock;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -67,10 +73,8 @@ public class OpenDataPatrolChangeEventListener implements MessageListenerConcurr
log.warn("consumeMessage msg body is blank");
return;
}
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:open_data_patrol:%s", msgObj.getPatrolId()),
30L, 30L, TimeUnit.SECONDS);
UpsertPatrolRecordForm patrolRecordForm = new UpsertPatrolRecordForm();

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java

@ -38,6 +38,12 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur
private RedisUtils redisUtils;
private DistributedLock distributedLock;
public OpenDataProjectChangeEventListener(DistributedLock distributedLock) {
this.distributedLock = distributedLock;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -68,10 +74,8 @@ public class OpenDataProjectChangeEventListener implements MessageListenerConcur
DisputeProcessMQMsg obj = JSON.parseObject(msg, DisputeProcessMQMsg.class);
EventInfoFormDTO formDTO = ConvertUtils.sourceToTarget(obj, EventInfoFormDTO.class);
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()),
30L, 30L, TimeUnit.SECONDS);
SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(formDTO);

8
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

@ -38,6 +38,12 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
private RedisUtils redisUtils;
private DistributedLock distributedLock;
public OpenDataStaffChangeEventListener(DistributedLock distributedLock) {
this.distributedLock = distributedLock;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@ -67,10 +73,8 @@ public class OpenDataStaffChangeEventListener implements MessageListenerConcurre
logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}, blockedMsgLabel:{}", msg, tags, pendingMsgLabel);
OrgOrStaffMQMsg obj = JSON.parseObject(msg, OrgOrStaffMQMsg.class);
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", obj.getOrgId()),
30L, 30L, TimeUnit.SECONDS);

Loading…
Cancel
Save