From eb4e4df5e0d200adac13b822f21ce60f37e02c08 Mon Sep 17 00:00:00 2001
From: yinzuomei <576302893@qq.com>
Date: Mon, 13 Dec 2021 08:35:22 +0800
Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E7=AE=97=E7=BE=A4=E4=BC=97=E6=BB=A1?=
 =?UTF-8?q?=E6=84=8F=E5=BA=A6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
 .../CalPartyUnitSatisfactionFormDTO.java      |   4 +
 .../epmet-heart/epmet-heart-server/pom.xml    |  22 ++++
 .../controller/IcPartyUnitController.java     |  15 +++
 .../epmet/mq/RocketMQConsumerRegister.java    |  31 ++++++
 ...PartyUnitSatisfactionCalEventListener.java | 104 ++++++++++++++++++
 .../com/epmet/service/IcPartyUnitService.java |   7 ++
 .../service/impl/IcPartyUnitServiceImpl.java  |  15 +++
 .../impl/IcUserDemandRecServiceImpl.java      |  18 +++
 .../src/main/resources/bootstrap.yml          |   5 +-
 9 files changed, 220 insertions(+), 1 deletion(-)
 create mode 100644 epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
 create mode 100644 epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java
index f852b2285a..bfaf63703a 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/messages/CalPartyUnitSatisfactionFormDTO.java
@@ -3,6 +3,7 @@ package com.epmet.commons.rocketmq.messages;
 
 import lombok.Data;
 
+import javax.validation.constraints.NotBlank;
 import java.io.Serializable;
 
 /**
@@ -10,6 +11,9 @@ import java.io.Serializable;
  */
 @Data
 public class CalPartyUnitSatisfactionFormDTO implements Serializable {
+    public interface AddUserInternalGroup {
+    }
+    @NotBlank(message = "客户id不能为空",groups = AddUserInternalGroup.class)
     private String customerId;
     private String partyUnitId;
 }
diff --git a/epmet-module/epmet-heart/epmet-heart-server/pom.xml b/epmet-module/epmet-heart/epmet-heart-server/pom.xml
index 648896a0c9..05ff205836 100644
--- a/epmet-module/epmet-heart/epmet-heart-server/pom.xml
+++ b/epmet-module/epmet-heart/epmet-heart-server/pom.xml
@@ -88,6 +88,12 @@
             2.0.0
             compile
         
+        
+        
+            com.epmet
+            epmet-commons-rocketmq
+            2.0.0
+        
     
 
     
@@ -152,6 +158,10 @@
                 
                 https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4
                 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
+
+                
+                true
+                192.168.1.140:9876;192.168.1.141:9876
             
         
         
@@ -195,6 +205,10 @@
                 
                 https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4
                 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
+
+                
+                false
+                192.168.1.140:9876;192.168.1.141:9876
             
         
         
@@ -238,6 +252,10 @@
                 
                 https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4
                 SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
+
+                
+                true
+                192.168.10.161:9876
             
         
         
@@ -281,6 +299,10 @@
                 
                 https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c
                 SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1
+
+                
+                true
+                192.168.11.187:9876;192.168.11.184:9876
             
         
     
diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java
index ec06c3bf5f..90c500b87d 100644
--- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java
+++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/controller/IcPartyUnitController.java
@@ -17,6 +17,7 @@
 
 package com.epmet.controller;
 
+import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO;
 import com.epmet.commons.tools.annotation.LoginUser;
 import com.epmet.commons.tools.aop.NoRepeatSubmit;
 import com.epmet.commons.tools.page.PageData;
@@ -150,4 +151,18 @@ public class IcPartyUnitController {
     public Result importData(@LoginUser TokenDto tokenDto, HttpServletResponse response, @RequestPart("file") MultipartFile file) throws IOException {
         return icPartyUnitService.importData(tokenDto, response, file);
     }
+
+
+    /**
+     * 计算区域化党建单位的群众满意度
+     *
+     * @param formDTO
+     * @return
+     */
+    @PostMapping("cal-partyunit-satisfation")
+    public Result calPartyUnitSatisfation(@RequestBody CalPartyUnitSatisfactionFormDTO formDTO){
+        ValidatorUtils.validateEntity(formDTO,CalPartyUnitSatisfactionFormDTO.AddUserInternalGroup.class);
+        icPartyUnitService.calPartyUnitSatisfation(formDTO);
+        return new Result();
+    }
 }
\ No newline at end of file
diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
new file mode 100644
index 0000000000..3f599eefc0
--- /dev/null
+++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/RocketMQConsumerRegister.java
@@ -0,0 +1,31 @@
+package com.epmet.mq;
+
+import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
+import com.epmet.commons.rocketmq.constants.TopicConstants;
+import com.epmet.commons.rocketmq.register.MQAbstractRegister;
+import com.epmet.commons.rocketmq.register.MQConsumerProperties;
+import com.epmet.mq.listener.PartyUnitSatisfactionCalEventListener;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description 如果rocketmq.enable=true,这里必须实现,且 实例化
+ * @author wxz
+ * @date 2021.07.14 17:13:41
+*/
+@Component
+public class RocketMQConsumerRegister extends MQAbstractRegister {
+
+    @Override
+    public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
+        // 客户初始化监听器注册
+        register(consumerProperties,
+                ConsomerGroupConstants.CAL_PARTY_UNIT_SATISFACTION,
+                MessageModel.CLUSTERING,
+                TopicConstants.CAL_PARTY_UNIT_SATISFACTION,
+                "*",
+                new PartyUnitSatisfactionCalEventListener());
+
+        // ...其他监听器类似
+    }
+}
diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java
new file mode 100644
index 0000000000..b7c3abc938
--- /dev/null
+++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/mq/listener/PartyUnitSatisfactionCalEventListener.java
@@ -0,0 +1,104 @@
+package com.epmet.mq.listener;
+
+import com.alibaba.fastjson.JSON;
+import com.epmet.commons.rocketmq.constants.MQUserPropertys;
+import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO;
+import com.epmet.commons.tools.distributedlock.DistributedLock;
+import com.epmet.commons.tools.exception.ExceptionUtils;
+import com.epmet.commons.tools.exception.RenException;
+import com.epmet.commons.tools.redis.RedisKeys;
+import com.epmet.commons.tools.redis.RedisUtils;
+import com.epmet.commons.tools.utils.SpringContextUtils;
+import com.epmet.service.IcPartyUnitService;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.redisson.api.RLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Description 计算区域化党建单位,群众满意度=分数相加➗ 需求的总个数。
+ * @author wxz
+ * @date 2021.10.13 15:21:48
+*/
+public class PartyUnitSatisfactionCalEventListener implements MessageListenerConcurrently {
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    private RedisUtils redisUtils;
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+
+        if (redisUtils == null) {
+            redisUtils = SpringContextUtils.getBean(RedisUtils.class);
+        }
+
+        try {
+            msgs.forEach(msg -> consumeMessage(msg));
+        } catch (Exception e) {
+            logger.error(ExceptionUtils.getErrorStackTrace(e));
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+
+    private void consumeMessage(MessageExt messageExt) {
+        // msg即为消息体
+        // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
+        String msg = new String(messageExt.getBody());
+        String topic = messageExt.getTopic();
+        String tags = messageExt.getTags();
+        String pendingMsgLabel = messageExt.getUserProperty(MQUserPropertys.BLOCKED_MSG_LABEL);
+
+        logger.info("【计算区域化党建单位群众满意度事件监听器】-需求完成-收到消息内容:{},操作:{}", msg, tags);
+        CalPartyUnitSatisfactionFormDTO obj = JSON.parseObject(msg, CalPartyUnitSatisfactionFormDTO.class);
+
+        DistributedLock distributedLock = null;
+        RLock lock = null;
+        try {
+            distributedLock = SpringContextUtils.getBean(DistributedLock.class);
+            lock = distributedLock.getLock(String.format("lock:ic_warn_stats:%s", obj.getCustomerId()),
+                    30L, 30L, TimeUnit.SECONDS);
+            //待执行方法
+            SpringContextUtils.getBean(IcPartyUnitService.class).calPartyUnitSatisfation(obj);
+        } catch (RenException e) {
+            // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
+            logger.error("【计算区域化党建单位群众满意度事件监听器】-MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+        } catch (Exception e) {
+            // 不是我们自己抛出的异常,可以让MQ重试
+            logger.error("【计算区域化党建单位群众满意度监听器】-MQ失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+            throw e;
+        } finally {
+            distributedLock.unLock(lock);
+        }
+
+        if (StringUtils.isNotBlank(pendingMsgLabel)) {
+            try {
+                removePendingMqMsgCache(pendingMsgLabel);
+            } catch (Exception e) {
+                logger.error("【计算区域化党建单位群众满意度监听器】-删除mq阻塞消息缓存失败:{}", ExceptionUtils.getErrorStackTrace(e));
+            }
+        }
+    }
+
+    /**
+     * @description
+     *
+     * @param pendingMsgLabel
+     * @return
+     * @author wxz
+     * @date 2021.10.14 16:32:32
+     */
+    private void removePendingMqMsgCache(String pendingMsgLabel) {
+        String key = RedisKeys.blockedMqMsgKey(pendingMsgLabel);
+        redisUtils.delete(key);
+        //logger.info("【开放数据事件监听器】删除mq阻塞消息缓存成功,blockedMsgLabel:{}", pendingMsgLabel);
+    }
+}
diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java
index 8106d724fe..b303cab184 100644
--- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java
+++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/IcPartyUnitService.java
@@ -18,6 +18,7 @@
 package com.epmet.service;
 
 import com.epmet.commons.mybatis.service.BaseService;
+import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO;
 import com.epmet.commons.tools.page.PageData;
 import com.epmet.commons.tools.security.dto.TokenDto;
 import com.epmet.commons.tools.utils.Result;
@@ -120,4 +121,10 @@ public interface IcPartyUnitService extends BaseService {
      * @Date 2021/11/29 11:01
      */
     Result importData(TokenDto tokenDto, HttpServletResponse response, MultipartFile file) throws IOException;
+
+    /**
+     * 计算区域化党建单位的群众满意度
+     * @param formDTO
+     */
+    void calPartyUnitSatisfation(CalPartyUnitSatisfactionFormDTO formDTO);
 }
\ No newline at end of file
diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java
index 4931389ae0..78625590df 100644
--- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java
+++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcPartyUnitServiceImpl.java
@@ -18,9 +18,11 @@
 package com.epmet.service.impl;
 
 import cn.afterturn.easypoi.excel.entity.result.ExcelImportResult;
+import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
+import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO;
 import com.epmet.commons.tools.constant.FieldConstant;
 import com.epmet.commons.tools.constant.NumConstant;
 import com.epmet.commons.tools.constant.StrConstant;
@@ -370,6 +372,19 @@ public class IcPartyUnitServiceImpl extends BaseServiceImpl map, String matter) {
         List matters = Arrays.asList(matter.split(StrConstant.COLON));
         List list = matters.stream().map(map::get).collect(Collectors.toList());
diff --git a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java
index cc318638b6..eec0f484eb 100644
--- a/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java
+++ b/epmet-module/epmet-heart/epmet-heart-server/src/main/java/com/epmet/service/impl/IcUserDemandRecServiceImpl.java
@@ -20,6 +20,7 @@ package com.epmet.service.impl;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.epmet.commons.mybatis.service.impl.BaseServiceImpl;
+import com.epmet.commons.rocketmq.messages.CalPartyUnitSatisfactionFormDTO;
 import com.epmet.commons.tools.constant.FieldConstant;
 import com.epmet.commons.tools.constant.NumConstant;
 import com.epmet.commons.tools.constant.StrConstant;
@@ -32,6 +33,7 @@ import com.epmet.commons.tools.page.PageData;
 import com.epmet.commons.tools.redis.common.CustomerStaffRedis;
 import com.epmet.commons.tools.utils.ConvertUtils;
 import com.epmet.commons.tools.utils.Result;
+import com.epmet.constant.SystemMessageType;
 import com.epmet.constant.UserDemandConstant;
 import com.epmet.dao.IcUserDemandOperateLogDao;
 import com.epmet.dao.IcUserDemandRecDao;
@@ -40,6 +42,7 @@ import com.epmet.dao.IcUserDemandServiceDao;
 import com.epmet.dto.CustomerGridDTO;
 import com.epmet.dto.IcUserDemandRecDTO;
 import com.epmet.dto.form.CustomerGridFormDTO;
+import com.epmet.dto.form.SystemMsgFormDTO;
 import com.epmet.dto.form.demand.*;
 import com.epmet.dto.result.AllGridsByUserIdResultDTO;
 import com.epmet.dto.result.UserBaseInfoResultDTO;
@@ -47,6 +50,7 @@ import com.epmet.dto.result.demand.DemandRecResultDTO;
 import com.epmet.dto.result.demand.IcResiUserReportDemandRes;
 import com.epmet.entity.*;
 import com.epmet.feign.EpmetAdminOpenFeignClient;
+import com.epmet.feign.EpmetMessageOpenFeignClient;
 import com.epmet.feign.EpmetUserOpenFeignClient;
 import com.epmet.feign.GovOrgOpenFeignClient;
 import com.epmet.service.IcResiDemandDictService;
@@ -86,6 +90,8 @@ public class IcUserDemandRecServiceImpl extends BaseServiceImpl page(Map params) {
@@ -455,6 +461,18 @@ public class IcUserDemandRecServiceImpl extends BaseServiceImpl