Browse Source
Conflicts: epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.javadev_shibei_match
45 changed files with 806 additions and 258 deletions
@ -0,0 +1,28 @@ |
|||
package com.epmet.commons.rocketmq.messages; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
|
|||
import java.io.Serializable; |
|||
import java.util.Date; |
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 组织、网格、人员中间库数据上报MQ |
|||
* @author sun |
|||
*/ |
|||
@Data |
|||
@AllArgsConstructor |
|||
public class OrgOrStaffMQMsg implements Serializable { |
|||
|
|||
//客户Id
|
|||
private String customerId; |
|||
//组织、网格、人员Id
|
|||
private String orgId; |
|||
//数据类型【组织:agency 网格:grid 人员:staff】
|
|||
private String orgType; |
|||
//操作类型【新增:add 修改删除:edit】
|
|||
private String type; |
|||
|
|||
|
|||
} |
@ -0,0 +1,23 @@ |
|||
package com.epmet.service; |
|||
|
|||
public interface SystemMessageScannerService { |
|||
/** |
|||
* @description 扫描未成功发送到MQ的消息 |
|||
* |
|||
* @param |
|||
* @return |
|||
* @author wxz |
|||
* @date 2021.10.16 23:24:05 |
|||
*/ |
|||
void scanPenddingSystemMQMessage(); |
|||
|
|||
/** |
|||
* @description 扫描发送成功但是未正常处理的MQ的消息 |
|||
* |
|||
* @param |
|||
* @return |
|||
* @author wxz |
|||
* @date 2021.10.16 23:24:27 |
|||
*/ |
|||
void scanBlockedSystemMQMessage(); |
|||
} |
@ -0,0 +1,22 @@ |
|||
package com.epmet.service; |
|||
|
|||
import com.epmet.feign.EpmetMessageOpenFeignClient; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
@Service |
|||
public class SystemMessageScannerServiceImpl implements SystemMessageScannerService { |
|||
|
|||
@Autowired |
|||
private EpmetMessageOpenFeignClient messageOpenFeignClient; |
|||
|
|||
@Override |
|||
public void scanPenddingSystemMQMessage() { |
|||
messageOpenFeignClient.penddingMqMsgScan(); |
|||
} |
|||
|
|||
@Override |
|||
public void scanBlockedSystemMQMessage() { |
|||
messageOpenFeignClient.blockedMqMsgScan(); |
|||
} |
|||
} |
@ -0,0 +1,25 @@ |
|||
package com.epmet.task; |
|||
|
|||
import com.epmet.service.SystemMessageScannerService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Component("blockedMQSystemMessageScanner") |
|||
@Slf4j |
|||
public class BlockedMQSystemMessageScanner implements ITask { |
|||
|
|||
@Autowired |
|||
private SystemMessageScannerService systemMessageScannerService; |
|||
|
|||
@Override |
|||
public void run(String params) { |
|||
try { |
|||
//log.info("【blockedMQSystemMessageScanner】开始执行scanBlockedSystemMQMessage任务");
|
|||
systemMessageScannerService.scanBlockedSystemMQMessage(); |
|||
log.info("【blockedMQSystemMessageScanner】执行scanBlockedSystemMQMessage任务完成"); |
|||
} catch (Exception e) { |
|||
log.error("【blockedMQSystemMessageScanner】执行scanBlockedSystemMQMessage任务失败"); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,25 @@ |
|||
package com.epmet.task; |
|||
|
|||
import com.epmet.service.SystemMessageScannerService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Component("penddingMQSystemMessageScanner") |
|||
@Slf4j |
|||
public class PenddingMQSystemMessageScanner implements ITask { |
|||
|
|||
@Autowired |
|||
private SystemMessageScannerService systemMessageScannerService; |
|||
|
|||
@Override |
|||
public void run(String params) { |
|||
try { |
|||
//log.info("【blockedMQSystemMessageScanner】开始执行scanBlockedSystemMQMessage任务");
|
|||
systemMessageScannerService.scanPenddingSystemMQMessage(); |
|||
log.info("【blockedMQSystemMessageScanner】执行scanPenddingSystemMQMessage任务完成"); |
|||
} catch (Exception e) { |
|||
log.error("【blockedMQSystemMessageScanner】执行scanPenddingSystemMQMessage任务失败"); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright 2018 人人开源 https://www.renren.io
|
|||
* <p> |
|||
* This program is free software: you can redistribute it and/or modify |
|||
* it under the terms of the GNU General Public License as published by |
|||
* the Free Software Foundation, either version 3 of the License, or |
|||
* (at your option) any later version. |
|||
* <p> |
|||
* This program is distributed in the hope that it will be useful, |
|||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
* GNU General Public License for more details. |
|||
* <p> |
|||
* You should have received a copy of the GNU General Public License |
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
|
|||
package com.epmet.dao; |
|||
|
|||
import com.epmet.commons.mybatis.dao.BaseDao; |
|||
import com.epmet.entity.SystemMessagePenddingEntity; |
|||
import org.apache.ibatis.annotations.Mapper; |
|||
import org.apache.ibatis.annotations.Param; |
|||
|
|||
/** |
|||
* 系统消息表 |
|||
* |
|||
* @author generator generator@elink-cn.com |
|||
* @since v1.0.0 2021-10-16 |
|||
*/ |
|||
@Mapper |
|||
public interface SystemMessagePenddingDao extends BaseDao<SystemMessagePenddingEntity> { |
|||
|
|||
void physicalDeleteById(@Param("penddingId") String penddingId); |
|||
} |
@ -0,0 +1,61 @@ |
|||
/** |
|||
* Copyright 2018 人人开源 https://www.renren.io
|
|||
* <p> |
|||
* This program is free software: you can redistribute it and/or modify |
|||
* it under the terms of the GNU General Public License as published by |
|||
* the Free Software Foundation, either version 3 of the License, or |
|||
* (at your option) any later version. |
|||
* <p> |
|||
* This program is distributed in the hope that it will be useful, |
|||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
* GNU General Public License for more details. |
|||
* <p> |
|||
* You should have received a copy of the GNU General Public License |
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
|
|||
package com.epmet.entity; |
|||
|
|||
import com.baomidou.mybatisplus.annotation.TableName; |
|||
|
|||
import com.epmet.commons.mybatis.entity.BaseEpmetEntity; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
|
|||
import java.util.Date; |
|||
|
|||
/** |
|||
* 系统消息表 |
|||
* |
|||
* @author generator generator@elink-cn.com |
|||
* @since v1.0.0 2021-10-16 |
|||
*/ |
|||
@Data |
|||
@EqualsAndHashCode(callSuper=false) |
|||
@TableName("system_message_pendding") |
|||
public class SystemMessagePenddingEntity extends BaseEpmetEntity { |
|||
|
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** |
|||
* 消息类型。init_customer:客户初始化,login登录,logout退出 |
|||
*/ |
|||
private String msgType; |
|||
|
|||
/** |
|||
* 消息主表id |
|||
*/ |
|||
private String msgId; |
|||
|
|||
/** |
|||
* 消息发送途径 |
|||
*/ |
|||
private String sendApproach; |
|||
|
|||
/** |
|||
* 消息内容 |
|||
*/ |
|||
private String content; |
|||
|
|||
} |
@ -0,0 +1,14 @@ |
|||
CREATE TABLE `system_message_pendding` ( |
|||
`ID` varchar(64) NOT NULL COMMENT '主键', |
|||
`MSG_ID` varchar(64) NOT NULL COMMENT '消息主表id', |
|||
`MSG_TYPE` varchar(32) NOT NULL COMMENT '消息类型。init_customer:客户初始化,login登录,logout退出', |
|||
`SEND_APPROACH` varchar(32) NOT NULL COMMENT '消息发送途径', |
|||
`CONTENT` varchar(1024) NOT NULL COMMENT '消息内容', |
|||
`REVISION` int(11) DEFAULT NULL COMMENT '乐观锁', |
|||
`CREATED_BY` varchar(32) NOT NULL COMMENT '创建人(发布消息的人)', |
|||
`CREATED_TIME` datetime NOT NULL COMMENT '创建时间', |
|||
`UPDATED_BY` varchar(32) NOT NULL COMMENT '更新人', |
|||
`UPDATED_TIME` datetime NOT NULL COMMENT '更新时间', |
|||
`DEL_FLAG` varchar(1) NOT NULL COMMENT '删除标记 0:未删除,1:已删除', |
|||
PRIMARY KEY (`ID`) USING BTREE |
|||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT COMMENT='系统消息滞留表' |
@ -0,0 +1,26 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
|||
|
|||
<mapper namespace="com.epmet.dao.SystemMessagePenddingDao"> |
|||
|
|||
<resultMap type="com.epmet.entity.SystemMessagePenddingEntity" id="systemMessagePenddingMap"> |
|||
<result property="id" column="ID"/> |
|||
<result property="msgId" column="MSG_ID"/> |
|||
<result property="msgType" column="MSG_TYPE"/> |
|||
<result property="sendApproach" column="SEND_APPROACH"/> |
|||
<result property="content" column="CONTENT"/> |
|||
<result property="revision" column="REVISION"/> |
|||
<result property="createdBy" column="CREATED_BY"/> |
|||
<result property="createdTime" column="CREATED_TIME"/> |
|||
<result property="updatedBy" column="UPDATED_BY"/> |
|||
<result property="updatedTime" column="UPDATED_TIME"/> |
|||
<result property="delFlag" column="DEL_FLAG"/> |
|||
</resultMap> |
|||
|
|||
<!--物理删除消息堆积--> |
|||
<delete id="physicalDeleteById"> |
|||
delete from system_message_pendding where ID = #{penddingId} |
|||
</delete> |
|||
|
|||
|
|||
</mapper> |
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright 2018 人人开源 https://www.renren.io
|
|||
* <p> |
|||
* This program is free software: you can redistribute it and/or modify |
|||
* it under the terms of the GNU General Public License as published by |
|||
* the Free Software Foundation, either version 3 of the License, or |
|||
* (at your option) any later version. |
|||
* <p> |
|||
* This program is distributed in the hope that it will be useful, |
|||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
* GNU General Public License for more details. |
|||
* <p> |
|||
* You should have received a copy of the GNU General Public License |
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
|
|||
package com.epmet.opendata.controller; |
|||
|
|||
import com.epmet.commons.tools.utils.Result; |
|||
import com.epmet.dto.basereport.form.EventInfoFormDTO; |
|||
import com.epmet.opendata.service.BaseDisputeProcessService; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.web.bind.annotation.PostMapping; |
|||
import org.springframework.web.bind.annotation.RequestBody; |
|||
import org.springframework.web.bind.annotation.RequestMapping; |
|||
import org.springframework.web.bind.annotation.RestController; |
|||
|
|||
|
|||
/** |
|||
* 事件信息表 |
|||
* |
|||
* @author generator generator@elink-cn.com |
|||
* @since v1.0.0 2021-10-15 |
|||
*/ |
|||
@RestController |
|||
@RequestMapping("basedisputeprocess") |
|||
public class BaseDisputeProcessController { |
|||
|
|||
@Autowired |
|||
private BaseDisputeProcessService baseDisputeProcessService; |
|||
|
|||
/** |
|||
* 获取上报事件 |
|||
* @Param formDTO |
|||
* @Return {@link Result} |
|||
* @Author zhaoqifeng |
|||
* @Date 2021/10/15 16:59 |
|||
*/ |
|||
@PostMapping("eventinfo") |
|||
public Result getEventinfo(@RequestBody(required = false) EventInfoFormDTO formDTO) { |
|||
baseDisputeProcessService.getEventinfo(formDTO); |
|||
return new Result(); |
|||
} |
|||
|
|||
} |
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright 2018 人人开源 https://www.renren.io
|
|||
* <p> |
|||
* This program is free software: you can redistribute it and/or modify |
|||
* it under the terms of the GNU General Public License as published by |
|||
* the Free Software Foundation, either version 3 of the License, or |
|||
* (at your option) any later version. |
|||
* <p> |
|||
* This program is distributed in the hope that it will be useful, |
|||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
* GNU General Public License for more details. |
|||
* <p> |
|||
* You should have received a copy of the GNU General Public License |
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
|
|||
package com.epmet.opendata.controller; |
|||
|
|||
import com.epmet.commons.tools.utils.Result; |
|||
import com.epmet.opendata.dto.form.UpsertPatrolRecordForm; |
|||
import com.epmet.opendata.service.UserPatrolRecordService; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.web.bind.annotation.*; |
|||
|
|||
|
|||
/** |
|||
* 中间库数据初始化controller |
|||
* |
|||
* @author generator generator@elink-cn.com |
|||
* @since v1.0.0 2021-10-14 |
|||
*/ |
|||
@RestController |
|||
@RequestMapping("init") |
|||
public class InitDataController { |
|||
|
|||
@Autowired |
|||
private UserPatrolRecordService userPatrolRecordService; |
|||
|
|||
/** |
|||
* @Author sun |
|||
* @Description 网格员信息中间库同步 |
|||
**/ |
|||
@PostMapping("patrol/{customerId}") |
|||
public Result<Boolean> getStaffBaseInfo(@PathVariable String customerId) { |
|||
UpsertPatrolRecordForm formDTO = new UpsertPatrolRecordForm(); |
|||
formDTO.setCustomerId(customerId); |
|||
return new Result().ok(userPatrolRecordService.insertPatrolRecord(formDTO)); |
|||
} |
|||
} |
@ -0,0 +1,108 @@ |
|||
package com.epmet.opendata.mq.listener; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.epmet.commons.rocketmq.constants.MQUserPropertys; |
|||
import com.epmet.commons.tools.distributedlock.DistributedLock; |
|||
import com.epmet.commons.tools.exception.ExceptionUtils; |
|||
import com.epmet.commons.tools.exception.RenException; |
|||
import com.epmet.commons.tools.redis.RedisKeys; |
|||
import com.epmet.commons.tools.redis.RedisUtils; |
|||
import com.epmet.commons.tools.utils.SpringContextUtils; |
|||
import com.epmet.dto.basereport.form.EventInfoFormDTO; |
|||
import com.epmet.feign.EpmetMessageOpenFeignClient; |
|||
import com.epmet.opendata.service.BaseDisputeProcessService; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
|||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
|||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
|||
import org.apache.rocketmq.common.message.MessageExt; |
|||
import org.redisson.api.RLock; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
|
|||
import java.util.List; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
/** |
|||
* @author wxz |
|||
* @Description 系统对接中间库,项目信息变更监听器 |
|||
* @date 2021.10.13 15:21:48 |
|||
*/ |
|||
public class OpenDataProjectChangeEventListener implements MessageListenerConcurrently { |
|||
|
|||
private Logger logger = LoggerFactory.getLogger(getClass()); |
|||
|
|||
private EpmetMessageOpenFeignClient messageOpenFeignClient; |
|||
|
|||
private RedisUtils redisUtils; |
|||
|
|||
@Override |
|||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { |
|||
|
|||
if (redisUtils == null) { |
|||
redisUtils = SpringContextUtils.getBean(RedisUtils.class); |
|||
} |
|||
|
|||
try { |
|||
msgs.forEach(msg -> consumeMessage(msg)); |
|||
} catch (Exception e) { |
|||
logger.error(ExceptionUtils.getErrorStackTrace(e)); |
|||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|||
} |
|||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|||
} |
|||
|
|||
private void consumeMessage(MessageExt messageExt) { |
|||
// msg即为消息体
|
|||
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
|
|||
String msg = new String(messageExt.getBody()); |
|||
String topic = messageExt.getTopic(); |
|||
String tags = messageExt.getTags(); |
|||
String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); |
|||
|
|||
//messageExt.propert
|
|||
|
|||
logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); |
|||
EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class); |
|||
|
|||
DistributedLock distributedLock = null; |
|||
RLock lock = null; |
|||
try { |
|||
distributedLock = SpringContextUtils.getBean(DistributedLock.class); |
|||
lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()), |
|||
30L, 30L, TimeUnit.SECONDS); |
|||
SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj); |
|||
} catch (RenException e) { |
|||
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
|
|||
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); |
|||
} catch (Exception e) { |
|||
// 不是我们自己抛出的异常,可以让MQ重试
|
|||
logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); |
|||
throw e; |
|||
} finally { |
|||
distributedLock.unLock(lock); |
|||
} |
|||
|
|||
if (StringUtils.isNotBlank(pendingMsgLabel)) { |
|||
try { |
|||
removePendingMqMsgCache(pendingMsgLabel); |
|||
} catch (Exception e) { |
|||
logger.error("【开放数据事件监听器】-project-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* @description |
|||
* |
|||
* @param pendingMsgLabel |
|||
* @return |
|||
* @description 应答mq消息 |
|||
* @author wxz |
|||
* @date 2021.10.14 16:32:32 |
|||
*/ |
|||
private void removePendingMqMsgCache(String pendingMsgLabel) { |
|||
String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); |
|||
redisUtils.delete(key); |
|||
} |
|||
} |
Loading…
Reference in new issue