diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java index f901f58270..dbbbe4bbf0 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/basereport/form/EventInfoFormDTO.java @@ -14,4 +14,8 @@ public class EventInfoFormDTO implements Serializable { private static final long serialVersionUID = 8479649048108914555L; private String customerId; private String projectId; + /** + * 操作类型【新增:add 修改删除:edit】 + */ + private String type; } diff --git a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java index cf8282a98b..180ac24fe5 100644 --- a/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java +++ b/epmet-module/data-statistical/data-statistical-client/src/main/java/com/epmet/dto/screen/ScreenProjectDataDTO.java @@ -196,9 +196,5 @@ public class ScreenProjectDataDTO implements Serializable { * fact_origin_project_main_daily.grid_id对应的网格名称 */ private String tempGridName; - /** - * 分类编码,存在多个,以英文逗号隔开,注意与all_cagtegory_name顺序一致; - */ - private String categoryCode; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java index 9625df2ef3..b53feaf6f6 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/entity/evaluationindex/screen/ScreenProjectDataEntity.java @@ -160,13 +160,4 @@ public class ScreenProjectDataEntity extends BaseEpmetEntity { */ private BigDecimal satisfactionScore; - /** - * 来源:议题issue 项目立项:agency 事件:resi_event; - */ - private String origin; - - /** - * 分类编码,存在多个,以英文逗号隔开,注意与all_cagtegory_name顺序一致; - */ - private String categoryCode; } diff --git a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java index c1895ff7c9..249eb11c81 100644 --- a/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java +++ b/epmet-module/data-statistical/data-statistical-server/src/main/java/com/epmet/service/impl/DataReportingServiceImpl.java @@ -186,6 +186,7 @@ public class DataReportingServiceImpl implements DataReportingService { private BaseDisputeProcessDTO getBaseDisputeProcessDTO(ScreenProjectDataDTO project) { BaseDisputeProcessDTO dto = new BaseDisputeProcessDTO(); + dto.setId(project.getProjectId()); dto.setCustomerId(project.getCustomerId()); dto.setEventName(project.getProjectTitle()); String categoryCode = project.getCategoryCode().split(StrConstant.COMMA)[0]; diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java new file mode 100644 index 0000000000..038756ea9c --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/controller/BaseDisputeProcessController.java @@ -0,0 +1,56 @@ +/** + * Copyright 2018 人人开源 https://www.renren.io + *

+ * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + *

+ * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + *

+ * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.epmet.opendata.controller; + +import com.epmet.commons.tools.utils.Result; +import com.epmet.dto.basereport.form.EventInfoFormDTO; +import com.epmet.opendata.service.BaseDisputeProcessService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + + +/** + * 事件信息表 + * + * @author generator generator@elink-cn.com + * @since v1.0.0 2021-10-15 + */ +@RestController +@RequestMapping("basedisputeprocess") +public class BaseDisputeProcessController { + + @Autowired + private BaseDisputeProcessService baseDisputeProcessService; + + /** + * 获取上报事件 + * @Param formDTO + * @Return {@link Result} + * @Author zhaoqifeng + * @Date 2021/10/15 16:59 + */ + @PostMapping("eventinfo") + public Result getEventinfo(@RequestBody(required = false) EventInfoFormDTO formDTO) { + baseDisputeProcessService.getEventinfo(formDTO); + return new Result(); + } + +} \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java index 0a966910bd..2ffed8c3dd 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -6,6 +6,7 @@ import com.epmet.commons.rocketmq.register.MQAbstractRegister; import com.epmet.commons.rocketmq.register.MQConsumerProperties; import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataPatrolChangeEventListener; +import com.epmet.opendata.mq.listener.OpenDataProjectChangeEventListener; import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; @@ -41,6 +42,12 @@ public class RocketMQConsumerRegister extends MQAbstractRegister { TopicConstants.PATROL, "*", new OpenDataPatrolChangeEventListener()); + register(consumerProperties, + ConsomerGroupConstants.PROJECT_CHANGED_COMPONENTS_GROUP, + MessageModel.CLUSTERING, + TopicConstants.PROJECT_CHANGED, + "*", + new OpenDataProjectChangeEventListener()); // ...其他监听器类似 } diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java new file mode 100644 index 0000000000..819c1975ce --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataProjectChangeEventListener.java @@ -0,0 +1,108 @@ +package com.epmet.opendata.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.dto.basereport.form.EventInfoFormDTO; +import com.epmet.feign.EpmetMessageOpenFeignClient; +import com.epmet.opendata.service.BaseDisputeProcessService; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author wxz + * @Description 系统对接中间库,项目信息变更监听器 + * @date 2021.10.13 15:21:48 + */ +public class OpenDataProjectChangeEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private EpmetMessageOpenFeignClient messageOpenFeignClient; + + private RedisUtils redisUtils; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); + String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.PENDING_MSG_LABEL); + + //messageExt.propert + + logger.info("【开放数据事件监听器】-项目信息变更-收到消息内容:{}, 操作:{}, pendingMsgLabel:{}", msg, tags, pendingMsgLabel); + EventInfoFormDTO obj = JSON.parseObject(msg, EventInfoFormDTO.class); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:open_data_project:%s", obj.getProjectId()), + 30L, 30L, TimeUnit.SECONDS); + SpringContextUtils.getBean(BaseDisputeProcessService.class).getEventinfo(obj); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【开放数据事件监听器】-项目信息变更-上报项目信息失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + distributedLock.unLock(lock); + } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【开放数据事件监听器】-project-删除mq滞留消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @description 应答mq消息 + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.pendingMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java index 874996c00b..20d2516dab 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/BaseDisputeProcessService.java @@ -18,13 +18,9 @@ package com.epmet.opendata.service; import com.epmet.commons.mybatis.service.BaseService; -import com.epmet.commons.tools.page.PageData; -import com.epmet.opendata.dto.BaseDisputeProcessDTO; +import com.epmet.dto.basereport.form.EventInfoFormDTO; import com.epmet.opendata.entity.BaseDisputeProcessEntity; -import java.util.List; -import java.util.Map; - /** * 事件信息表 * @@ -34,62 +30,11 @@ import java.util.Map; public interface BaseDisputeProcessService extends BaseService { /** - * 默认分页 - * - * @param params - * @return PageData - * @author generator - * @date 2021-10-15 - */ - PageData page(Map params); - - /** - * 默认查询 - * - * @param params - * @return java.util.List - * @author generator - * @date 2021-10-15 - */ - List list(Map params); - - /** - * 单条查询 - * - * @param id - * @return BaseDisputeProcessDTO - * @author generator - * @date 2021-10-15 - */ - BaseDisputeProcessDTO get(String id); - - /** - * 默认保存 - * - * @param dto - * @return void - * @author generator - * @date 2021-10-15 - */ - void save(BaseDisputeProcessDTO dto); - - /** - * 默认更新 - * - * @param dto - * @return void - * @author generator - * @date 2021-10-15 - */ - void update(BaseDisputeProcessDTO dto); - - /** - * 批量删除 - * - * @param ids - * @return void - * @author generator - * @date 2021-10-15 + * 获取上报事件 + * @Param formDTO + * @Return + * @Author zhaoqifeng + * @Date 2021/10/15 16:59 */ - void delete(String[] ids); + void getEventinfo(EventInfoFormDTO formDTO); } \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java index ade833cb71..0a0d18a9f0 100644 --- a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/service/impl/BaseDisputeProcessServiceImpl.java @@ -17,23 +17,21 @@ package com.epmet.opendata.service.impl; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.core.metadata.IPage; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; -import com.epmet.commons.tools.constant.FieldConstant; -import com.epmet.commons.tools.page.PageData; +import com.epmet.commons.tools.exception.RenException; import com.epmet.commons.tools.utils.ConvertUtils; +import com.epmet.commons.tools.utils.Result; +import com.epmet.dto.basereport.form.EventInfoFormDTO; +import com.epmet.feign.DataStatisticalOpenFeignClient; import com.epmet.opendata.dao.BaseDisputeProcessDao; import com.epmet.opendata.dto.BaseDisputeProcessDTO; import com.epmet.opendata.entity.BaseDisputeProcessEntity; import com.epmet.opendata.service.BaseDisputeProcessService; -import org.apache.commons.lang3.StringUtils; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.util.Arrays; +import javax.annotation.Resource; import java.util.List; -import java.util.Map; /** * 事件信息表 @@ -43,57 +41,34 @@ import java.util.Map; */ @Service public class BaseDisputeProcessServiceImpl extends BaseServiceImpl implements BaseDisputeProcessService { + @Resource + private DataStatisticalOpenFeignClient dataStatisticalOpenFeignClient; + /** + * 获取上报事件 + * + * @param formDTO + * @Param formDTO + * @Return + * @Author zhaoqifeng + * @Date 2021/10/15 16:59 + */ @Override - public PageData page(Map params) { - IPage page = baseDao.selectPage( - getPage(params, FieldConstant.CREATED_TIME, false), - getWrapper(params) - ); - return getPageData(page, BaseDisputeProcessDTO.class); - } - - @Override - public List list(Map params) { - List entityList = baseDao.selectList(getWrapper(params)); - - return ConvertUtils.sourceToTarget(entityList, BaseDisputeProcessDTO.class); - } + public void getEventinfo(EventInfoFormDTO formDTO) { + Result> result = dataStatisticalOpenFeignClient.getEventInfo(formDTO); + if (!result.success()) { + throw new RenException(result.getInternalMsg()); + } + List list = result.getData(); + if (CollectionUtils.isNotEmpty(list)) { + List entityList = ConvertUtils.sourceToTarget(list, BaseDisputeProcessEntity.class); + if("add".equals(formDTO.getType())){ + insertBatch(entityList); + }else { + updateBatchById(entityList); + } + } - private QueryWrapper getWrapper(Map params){ - String id = (String)params.get(FieldConstant.ID_HUMP); - - QueryWrapper wrapper = new QueryWrapper<>(); - wrapper.eq(StringUtils.isNotBlank(id), FieldConstant.ID, id); - - return wrapper; - } - - @Override - public BaseDisputeProcessDTO get(String id) { - BaseDisputeProcessEntity entity = baseDao.selectById(id); - return ConvertUtils.sourceToTarget(entity, BaseDisputeProcessDTO.class); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void save(BaseDisputeProcessDTO dto) { - BaseDisputeProcessEntity entity = ConvertUtils.sourceToTarget(dto, BaseDisputeProcessEntity.class); - insert(entity); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void update(BaseDisputeProcessDTO dto) { - BaseDisputeProcessEntity entity = ConvertUtils.sourceToTarget(dto, BaseDisputeProcessEntity.class); - updateById(entity); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public void delete(String[] ids) { - // 逻辑删除(@TableLogic 注解) - baseDao.deleteBatchIds(Arrays.asList(ids)); } } \ No newline at end of file