Browse Source

党建小助手mq配置

master
syc 3 years ago
parent
commit
ddae26a7ee
  1. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 5
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 26
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartyMeetingMessageMQMsg.java
  4. 5
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
  5. 3
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  6. 5
      epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
  7. 99
      epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/PartyMeetingMessageListener.java

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -98,4 +98,9 @@ public interface ConsomerGroupConstants {
* 创建党员居民信息消费者组将user库的党员信息同步到partymember库的党员表
*/
String CREATE_RESI_PARTYMEMBER_SYNC_GROUP = "create_resi_sync_group";
/**
* 党建小助手监听器分组
*/
String PARTY_MEETING_MESSAGE = "party_meeting_message";
}

5
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -84,4 +84,9 @@ public interface TopicConstants {
* 居民的党员信息
*/
String PARTYMEMBER_RESI = "partymember_resi";
/**
* 消息-党建小助手
*/
String IC_MESSAGE = "ic_message";
}

26
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/PartyMeetingMessageMQMsg.java

@ -0,0 +1,26 @@
package com.epmet.commons.rocketmq.messages;
import lombok.Data;
import java.io.Serializable;
/**
* 党建小助手发布活动活动到期提醒活动发布提醒推送MQ
* @author sun
*/
@Data
public class PartyMeetingMessageMQMsg implements Serializable {
//客户Id
private String customerId;
//活动Id
private String icPartyActId;
//动作类型 发布活动:publish 提前提醒:remind 提前通知:notify
private String type;
//党组织Id
private String publishOrgId;
//党组织类型 0省委,1市委,2区委,3党工委,4党委,5支部;6党小组
private String publishOrgType;
}

5
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java

@ -170,4 +170,9 @@ public interface SystemMessageType {
*/
String PARTYMEMBER_RESI_IMPORT = "partymember_resi_import";
/**
* 党建小助手消息
*/
String PARTY_MEETING_MESSAGE = "party_meeting_message";
}

3
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -237,6 +237,9 @@ public class SystemMessageServiceImpl implements SystemMessageService {
case SystemMessageType.PARTYMEMBER_RESI_IMPORT:
topic=TopicConstants.PARTYMEMBER_RESI;
break;
case SystemMessageType.PARTY_MEETING_MESSAGE:
topic=TopicConstants.IC_MESSAGE;
break;
default:
logger.error("getTopicByMsgType msgType:{} is not support for any topic", msgType);
}

5
epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java

@ -4,6 +4,8 @@ import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.constant.SystemMessageType;
import com.epmet.mq.listener.PartyMeetingMessageListener;
import com.epmet.mq.listener.ResiPartyMemberSyncListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
@ -15,5 +17,8 @@ public class RocketMQConsumerRegister extends MQAbstractRegister {
public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
register(consumerProperties, ConsomerGroupConstants.CREATE_RESI_PARTYMEMBER_SYNC_GROUP, MessageModel.CLUSTERING,
TopicConstants.PARTYMEMBER_RESI, "*", new ResiPartyMemberSyncListener());
register(consumerProperties, ConsomerGroupConstants.PARTY_MEETING_MESSAGE, MessageModel.CLUSTERING,
TopicConstants.IC_MESSAGE, SystemMessageType.PARTY_MEETING_MESSAGE, new PartyMeetingMessageListener());
}
}

99
epmet-module/resi-partymember/resi-partymember-server/src/main/java/com/epmet/mq/listener/PartyMeetingMessageListener.java

@ -0,0 +1,99 @@
package com.epmet.mq.listener;
import com.alibaba.fastjson.JSON;
import com.epmet.commons.rocketmq.constants.MQUserPropertys;
import com.epmet.commons.rocketmq.messages.PartyMeetingMessageMQMsg;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.EpmetException;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.redis.RedisKeys;
import com.epmet.commons.tools.redis.RedisUtils;
import com.epmet.commons.tools.utils.SpringContextUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author sun
* @Description 党建小助手发布活动活动提前提醒活动发布提醒通知
*/
public class PartyMeetingMessageListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
private RedisUtils redisUtils;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (redisUtils == null) {
redisUtils = SpringContextUtils.getBean(RedisUtils.class);
}
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 逐条消费
* @param messageExt
*/
private void consumeMessage(MessageExt messageExt) {
// msg即为消息体
String msg = new String(messageExt.getBody());
logger.info("msg is {}",msg);
String tags = messageExt.getTags();
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
logger.info("【党建小助手消息】-活动消息-收到消息内容:{},操作:{}", msg, tags);
PartyMeetingMessageMQMsg obj = JSON.parseObject(msg, PartyMeetingMessageMQMsg.class);
logger.info("obj is {}",JSON.toJSONString(obj));
DistributedLock distributedLock = null;
RLock lock = null;
try {
distributedLock = SpringContextUtils.getBean(DistributedLock.class);
lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()),
30L, 30L, TimeUnit.SECONDS);
//待执行方法
//SpringContextUtils.getBean(IcVolunteerPolyService.class).volunteerChanged(obj);
} catch (EpmetException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【党建小助手消息】-活动消息MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【党建小助手消息】-活动消息MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
} finally {
assert distributedLock != null;
distributedLock.unLock(lock);
}
if (StringUtils.isNotBlank(pendingMsgLabel)) {
try {
removePendingMqMsgCache(pendingMsgLabel);
} catch (Exception e) {
logger.error("【党建小助手消息】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
}
}
}
private void removePendingMqMsgCache(String pendingMsgLabel) {
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
Boolean rst = redisUtils.delete(key);
}
}
Loading…
Cancel
Save