9 changed files with 214 additions and 131 deletions
@ -0,0 +1,56 @@ |
|||||
|
/** |
||||
|
* Copyright 2018 人人开源 https://www.renren.io
|
||||
|
* <p> |
||||
|
* This program is free software: you can redistribute it and/or modify |
||||
|
* it under the terms of the GNU General Public License as published by |
||||
|
* the Free Software Foundation, either version 3 of the License, or |
||||
|
* (at your option) any later version. |
||||
|
* <p> |
||||
|
* This program is distributed in the hope that it will be useful, |
||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
* GNU General Public License for more details. |
||||
|
* <p> |
||||
|
* You should have received a copy of the GNU General Public License |
||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
*/ |
||||
|
|
||||
|
package com.epmet.opendata.controller; |
||||
|
|
||||
|
import com.epmet.commons.tools.utils.Result; |
||||
|
import com.epmet.dto.basereport.form.EventInfoFormDTO; |
||||
|
import com.epmet.opendata.service.BaseDisputeProcessService; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.web.bind.annotation.PostMapping; |
||||
|
import org.springframework.web.bind.annotation.RequestBody; |
||||
|
import org.springframework.web.bind.annotation.RequestMapping; |
||||
|
import org.springframework.web.bind.annotation.RestController; |
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 事件信息表 |
||||
|
* |
||||
|
* @author generator generator@elink-cn.com |
||||
|
* @since v1.0.0 2021-10-15 |
||||
|
*/ |
||||
|
@RestController |
||||
|
@RequestMapping("basedisputeprocess") |
||||
|
public class BaseDisputeProcessController { |
||||
|
|
||||
|
@Autowired |
||||
|
private BaseDisputeProcessService baseDisputeProcessService; |
||||
|
|
||||
|
/** |
||||
|
* 获取上报事件 |
||||
|
* @Param formDTO |
||||
|
* @Return {@link Result} |
||||
|
* @Author zhaoqifeng |
||||
|
* @Date 2021/10/15 16:59 |
||||
|
*/ |
||||
|
@PostMapping("eventinfo") |
||||
|
public Result getEventinfo(@RequestBody(required = false) EventInfoFormDTO formDTO) { |
||||
|
baseDisputeProcessService.getEventinfo(formDTO); |
||||
|
return new Result(); |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,108 @@ |
|||||
|
package com.epmet.opendata.mq.listener; |
||||
|
|
||||
|
import com.alibaba.fastjson.JSON; |
||||
|
import com.epmet.commons.rocketmq.constants.MQUserPropertys; |
||||
|
import com.epmet.commons.tools.distributedlock.DistributedLock; |
||||
|
import com.epmet.commons.tools.exception.ExceptionUtils; |
||||
|
import com.epmet.commons.tools.exception.RenException; |
||||
|
import com.epmet.commons.tools.redis.RedisKeys; |
||||
|
import com.epmet.commons.tools.redis.RedisUtils; |
||||
|
import com.epmet.commons.tools.utils.SpringContextUtils; |
||||
|
import com.epmet.dto.basereport.form.EventInfoFormDTO; |
||||
|
import com.epmet.feign.EpmetMessageOpenFeignClient; |
||||
|
import com.epmet.opendata.service.BaseDisputeProcessService; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
||||
|
import org.apache.rocketmq.common.message.MessageExt; |
||||
|
import org.redisson.api.RLock; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
|
||||
|
import java.util.List; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
/** |
||||
|
* @author wxz |
||||
|
* @Description 系统对接中间库,项目信息变更监听器 |
||||
|
* @date 2021.10.13 15:21:48 |
||||
|
*/ |
||||
|
public class OpenDataProjectChangeEventListener implements MessageListenerConcurrently { |
||||
|
|
||||
|
private Logger logger = LoggerFactory.getLogger(getClass()); |
||||
|
|
||||
|
private EpmetMessageOpenFeignClient messageOpenFeignClient; |
||||
|
|
||||
|
private RedisUtils redisUtils; |
||||
|
|
||||
|
@Override |
||||
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { |
||||
|
|
||||
|
if (redisUtils == null) { |
||||
|
redisUtils = SpringContextUtils.getBean(RedisUtils.class); |
||||
|
} |
||||
|
|
||||
|
try { |
||||
|
msgs.forEach(msg -> consumeMessage(msg)); |
||||
|
} catch (Exception e) { |
||||
|
logger.error(ExceptionUtils.getErrorStackTrace(e)); |
||||
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
||||
|
} |
||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
||||
|
} |
||||
|
|
||||
|
private void consumeMessage(MessageExt messageExt) { |
||||
|
// msg即为消息体
|
||||
|
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
|
||||
|
String msg = new String(messageExt.getBody()); |
||||
|
String topic = messageExt.getTopic(); |
||||
|
String tags = messageExt.getTags(); |
||||
|
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); |
||||
|
|
||||
|
//messageExt.propert
|
||||
|
|
||||
|
logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); |
||||
|
EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class); |
||||
|
|
||||
|
DistributedLock distributedLock = null; |
||||
|
RLock lock = null; |
||||
|
try { |
||||
|
distributedLock = SpringContextUtils.getBean(DistributedLock.class); |
||||
|
lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()), |
||||
|
30L, 30L, TimeUnit.SECONDS); |
||||
|
SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj); |
||||
|
} catch (RenException e) { |
||||
|
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
|
||||
|
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); |
||||
|
} catch (Exception e) { |
||||
|
// 不是我们自己抛出的异常,可以让MQ重试
|
||||
|
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); |
||||
|
throw e; |
||||
|
} finally { |
||||
|
distributedLock.unLock(lock); |
||||
|
} |
||||
|
|
||||
|
if (StringUtils.isNotBlank(pendingMsgLabel)) { |
||||
|
try { |
||||
|
removePendingMqMsgCache(pendingMsgLabel); |
||||
|
} catch (Exception e) { |
||||
|
logger.error("【开放数据事件监听器】-project-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* @description |
||||
|
* |
||||
|
* @param pendingMsgLabel |
||||
|
* @return |
||||
|
* @description 应答mq消息 |
||||
|
* @author wxz |
||||
|
* @date 2021.10.14 16:32:32 |
||||
|
*/ |
||||
|
private void removePendingMqMsgCache(String pendingMsgLabel) { |
||||
|
String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); |
||||
|
redisUtils.delete(key); |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue