diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java index f852b2285a..bfaf63703a 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java @@ -3,6 +3,7 @@ package com.epmet.commons.rocketmq.messages; import lombok.Data; +import javax.validation.constraints.NotBlank; import java.io.Serializable; /** @@ -10,6 +11,9 @@ import java.io.Serializable; */ @Data public class CalPartyUnitSatisfactionFormDTO implements Serializable { + public interface AddUserInternalGroup { + } + @NotBlank(message = "客户id不能为空",groups = AddUserInternalGroup.class) private String customerId; private String partyUnitId; } diff --git a/epmet-module/epmet-heart/epmet-heart-server/pom.xml b/epmet-module/epmet-heart/epmet-heart-server/pom.xml index 648896a0c9..05ff205836 100644 --- a/epmet-module/epmet-heart/epmet-heart-server/pom.xml +++ b/epmet-module/epmet-heart/epmet-heart-server/pom.xml @@ -88,6 +88,12 @@ 2.0.0 compile + + + com.epmet + epmet-commons-rocketmq + 2.0.0 + @@ -152,6 +158,10 @@ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.1.140:9876;192.168.1.141:9876 @@ -195,6 +205,10 @@ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + false + 192.168.1.140:9876;192.168.1.141:9876 @@ -238,6 +252,10 @@ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.10.161:9876 @@ -281,6 +299,10 @@ https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + + true + 192.168.11.187:9876;192.168.11.184:9876 diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java index ec06c3bf5f..90c500b87d 100644 --- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java @@ -17,6 +17,7 @@ package com.epmet.controller; +import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO; import com.epmet.commons.tools.annotation.LoginUser; import com.epmet.commons.tools.aop.NoRepeatSubmit; import com.epmet.commons.tools.page.PageData; @@ -150,4 +151,18 @@ public class IcPartyUnitController { public Result importData(@LoginUser TokenDto tokenDto, HttpServletResponse response, @RequestPart("file") MultipartFile file) throws IOException { return icPartyUnitService.importData(tokenDto, response, file); } + + + /** + * 计算区域化党建单位的群众满意度 + * + * @param formDTO + * @return + */ + @PostMapping("cal-partyunit-satisfation") + public Result calPartyUnitSatisfation(@RequestBody CalPartyUnitSatisfactionFormDTO formDTO){ + ValidatorUtils.validateEntity(formDTO,CalPartyUnitSatisfactionFormDTO.AddUserInternalGroup.class); + icPartyUnitService.calPartyUnitSatisfation(formDTO); + return new Result(); + } } \ No newline at end of file diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..3f599eefc0 --- /dev/null +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java @@ -0,0 +1,31 @@ +package com.epmet.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.commons.rocketmq.register.MQAbstractRegister; +import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.mq.listener.PartyUnitSatisfactionCalEventListener; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.stereotype.Component; + +/** + * @Description 如果rocketmq.enable=true,这里必须实现,且 实例化 + * @author wxz + * @date 2021.07.14 17:13:41 +*/ +@Component +public class RocketMQConsumerRegister extends MQAbstractRegister { + + @Override + public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { + // 客户初始化监听器注册 + register(consumerProperties, + ConsomerGroupConstants.CAL_PARTY_UNIT_SATISFACTION, + MessageModel.CLUSTERING, + TopicConstants.CAL_PARTY_UNIT_SATISFACTION, + "*", + new PartyUnitSatisfactionCalEventListener()); + + // ...其他监听器类似 + } +} diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java new file mode 100644 index 0000000000..b7c3abc938 --- /dev/null +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java @@ -0,0 +1,104 @@ +package com.epmet.mq.listener; + +import com.alibaba.fastjson.JSON; +import com.epmet.commons.rocketmq.constants.MQUserPropertys; +import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO; +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import com.epmet.commons.tools.redis.RedisKeys; +import com.epmet.commons.tools.redis.RedisUtils; +import com.epmet.commons.tools.utils.SpringContextUtils; +import com.epmet.service.IcPartyUnitService; +import org.apache.commons.lang.StringUtils; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @Description 计算区域化党建单位,群众满意度=分数相加➗ 需求的总个数。 + * @author wxz + * @date 2021.10.13 15:21:48 +*/ +public class PartyUnitSatisfactionCalEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private RedisUtils redisUtils; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + + if (redisUtils == null) { + redisUtils = SpringContextUtils.getBean(RedisUtils.class); + } + + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String topic = messageExt.getTopic(); + String tags = messageExt.getTags(); + String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL); + + logger.info("【计算区域化党建单位群众满意度事件监听器】-需求完成-收到消息内容:{},操作:{}", msg, tags); + CalPartyUnitSatisfactionFormDTO obj = JSON.parseObject(msg, CalPartyUnitSatisfactionFormDTO.class); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + distributedLock = SpringContextUtils.getBean(DistributedLock.class); + lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()), + 30L, 30L, TimeUnit.SECONDS); + //待执行方法 + SpringContextUtils.getBean(IcPartyUnitService.class).calPartyUnitSatisfation(obj); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【计算区域化党建单位群众满意度事件监听器】-MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【计算区域化党建单位群众满意度监听器】-MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + distributedLock.unLock(lock); + } + + if (StringUtils.isNotBlank(pendingMsgLabel)) { + try { + removePendingMqMsgCache(pendingMsgLabel); + } catch (Exception e) { + logger.error("【计算区域化党建单位群众满意度监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e)); + } + } + } + + /** + * @description + * + * @param pendingMsgLabel + * @return + * @author wxz + * @date 2021.10.14 16:32:32 + */ + private void removePendingMqMsgCache(String pendingMsgLabel) { + String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel); + redisUtils.delete(key); + //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel); + } +} diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java index 8106d724fe..b303cab184 100644 --- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java @@ -18,6 +18,7 @@ package com.epmet.service; import com.epmet.commons.mybatis.service.BaseService; +import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO; import com.epmet.commons.tools.page.PageData; import com.epmet.commons.tools.security.dto.TokenDto; import com.epmet.commons.tools.utils.Result; @@ -120,4 +121,10 @@ public interface IcPartyUnitService extends BaseService { * @Date 2021/11/29 11:01 */ Result importData(TokenDto tokenDto, HttpServletResponse response, MultipartFile file) throws IOException; + + /** + * 计算区域化党建单位的群众满意度 + * @param formDTO + */ + void calPartyUnitSatisfation(CalPartyUnitSatisfactionFormDTO formDTO); } \ No newline at end of file diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java index 4931389ae0..78625590df 100644 --- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java @@ -18,9 +18,11 @@ package com.epmet.service.impl; import cn.afterturn.easypoi.excel.entity.result.ExcelImportResult; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO; import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.constant.StrConstant; @@ -370,6 +372,19 @@ public class IcPartyUnitServiceImpl extends BaseServiceImpl map, String matter) { List matters = Arrays.asList(matter.split(StrConstant.COLON)); List list = matters.stream().map(map::get).collect(Collectors.toList()); diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java index cc318638b6..eec0f484eb 100644 --- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java +++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java @@ -20,6 +20,7 @@ package com.epmet.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.epmet.commons.mybatis.service.impl.BaseServiceImpl; +import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO; import com.epmet.commons.tools.constant.FieldConstant; import com.epmet.commons.tools.constant.NumConstant; import com.epmet.commons.tools.constant.StrConstant; @@ -32,6 +33,7 @@ import com.epmet.commons.tools.page.PageData; import com.epmet.commons.tools.redis.common.CustomerStaffRedis; import com.epmet.commons.tools.utils.ConvertUtils; import com.epmet.commons.tools.utils.Result; +import com.epmet.constant.SystemMessageType; import com.epmet.constant.UserDemandConstant; import com.epmet.dao.IcUserDemandOperateLogDao; import com.epmet.dao.IcUserDemandRecDao; @@ -40,6 +42,7 @@ import com.epmet.dao.IcUserDemandServiceDao; import com.epmet.dto.CustomerGridDTO; import com.epmet.dto.IcUserDemandRecDTO; import com.epmet.dto.form.CustomerGridFormDTO; +import com.epmet.dto.form.SystemMsgFormDTO; import com.epmet.dto.form.demand.*; import com.epmet.dto.result.AllGridsByUserIdResultDTO; import com.epmet.dto.result.UserBaseInfoResultDTO; @@ -47,6 +50,7 @@ import com.epmet.dto.result.demand.DemandRecResultDTO; import com.epmet.dto.result.demand.IcResiUserReportDemandRes; import com.epmet.entity.*; import com.epmet.feign.EpmetAdminOpenFeignClient; +import com.epmet.feign.EpmetMessageOpenFeignClient; import com.epmet.feign.EpmetUserOpenFeignClient; import com.epmet.feign.GovOrgOpenFeignClient; import com.epmet.service.IcResiDemandDictService; @@ -86,6 +90,8 @@ public class IcUserDemandRecServiceImpl extends BaseServiceImpl page(Map params) { @@ -455,6 +461,18 @@ public class IcUserDemandRecServiceImpl extends BaseServiceImpl