From 932932ea48ca149ca5229f20cc7673a4cd3fc1e0 Mon Sep 17 00:00:00 2001 From: wxz Date: Wed, 13 Oct 2021 17:35:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9A=201.epmet-data-open?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=20=E4=BF=AE=E6=94=B9=EF=BC=9A=201.epmet-mess?= =?UTF-8?q?age=E6=9C=8D=E5=8A=A1=EF=BC=8C=E5=A2=9E=E5=8A=A0=E9=92=88?= =?UTF-8?q?=E5=AF=B9=E5=B7=A5=E4=BD=9C=E4=BA=BA=E5=91=98/=E6=9C=BA?= =?UTF-8?q?=E6=9E=84=E7=9A=84=E5=88=9B=E5=BB=BA/=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/ConsomerGroupConstants.java | 10 + .../rocketmq/constants/TopicConstants.java | 11 + .../com/epmet/constant/SystemMessageType.java | 40 +++ .../impl/SystemMessageServiceImpl.java | 12 + .../open-data-worker-client/pom.xml | 15 + .../open-data-worker-server/pom.xml | 258 ++++++++++++++++++ .../epmet/opendata/OpenDataApplication.java | 26 ++ .../NacosServiceListListenerRegisterer.java | 161 +++++++++++ .../opendata/mq/RocketMQConsumerRegister.java | 39 +++ .../OpenDataOrgChangeEventListener.java | 61 +++++ .../OpenDataStaffChangeEventListener.java | 61 +++++ .../src/main/resources/bootstrap.yml | 138 ++++++++++ .../src/main/resources/init_db.sql | 14 + .../src/main/resources/logback-spring.xml | 176 ++++++++++++ epmet-module/open-data-worker/pom.xml | 18 ++ epmet-module/pom.xml | 3 +- 16 files changed, 1042 insertions(+), 1 deletion(-) create mode 100644 epmet-module/open-data-worker/open-data-worker-client/pom.xml create mode 100644 epmet-module/open-data-worker/open-data-worker-server/pom.xml create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql create mode 100644 epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml create mode 100644 epmet-module/open-data-worker/pom.xml diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java index 890d19acf1..b4dba548b7 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java @@ -45,4 +45,14 @@ public interface ConsomerGroupConstants { */ String POINT_OPERATION_LOG_GROUP = "point_operation_log_group"; + /** + * 开放的对接数据(中间库) 组织变更事件监听器分组 + */ + String OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP = "open_data_org_change_event_listener_group"; + + /** + * 开放的对接数据(中间库) 工作人员变更事件监听器分组 + */ + String OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP = "open_data_staff_change_event_listener_group"; + } diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java index 8fe36aa53a..927fa19198 100644 --- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java +++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java @@ -13,6 +13,7 @@ public interface TopicConstants { * 项目变动 */ String PROJECT_CHANGED = "project_changed"; + /** * 小组成就 */ @@ -27,4 +28,14 @@ public interface TopicConstants { * 积分系统话题 */ String POINT = "point"; + + /** + * 组织信息变更 + */ + String ORG = "org"; + + /** + * 工作人员信息变更 + */ + String STAFF = "staff"; } diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java index 626b081436..e1c59f003e 100644 --- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java +++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java @@ -25,4 +25,44 @@ public interface SystemMessageType { */ String POINT_RULE_CHANGED = "point_rule_changed"; + /** + * 网格新增 + */ + String GRID_CREATE = "grid_create"; + + /** + * 网格信息变更 + */ + String GRID_CHANGE = "grid_change"; + + /** + * agency机构新增 + */ + String AGENCY_CREATE = "agency_create"; + + /** + * agency机构信息变更 + */ + String AGENCY_CHANGE = "agency_change"; + + /** + * 部门新增 + */ + String DEPARTMENT_CREATE = "department_create"; + + /** + * 部门信息变更 + */ + String DEPARTMENT_CHANGE = "department_change"; + + /** + * 工作人员新增 + */ + String STAFF_CREATE = "staff_create"; + + /** + * 工作人员变更 + */ + String STAFF_CHANGE = "staff_change"; + } diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java index 9c56520419..f7aacf297c 100644 --- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java +++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java @@ -79,6 +79,18 @@ public class SystemMessageServiceImpl implements SystemMessageService { case SystemMessageType.POINT_RULE_CHANGED: topic = TopicConstants.POINT; break; + case SystemMessageType.GRID_CREATE: + case SystemMessageType.GRID_CHANGE: + case SystemMessageType.AGENCY_CREATE: + case SystemMessageType.AGENCY_CHANGE: + case SystemMessageType.DEPARTMENT_CREATE: + case SystemMessageType.DEPARTMENT_CHANGE: + topic = TopicConstants.ORG; + break; + case SystemMessageType.STAFF_CREATE: + case SystemMessageType.STAFF_CHANGE: + topic = TopicConstants.STAFF; + break; } return topic; } diff --git a/epmet-module/open-data-worker/open-data-worker-client/pom.xml b/epmet-module/open-data-worker/open-data-worker-client/pom.xml new file mode 100644 index 0000000000..14be115052 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-client/pom.xml @@ -0,0 +1,15 @@ + + + + open-data-worker + com.epmet + 2.0.0 + + 4.0.0 + + open-data-worker-client + + + \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/pom.xml b/epmet-module/open-data-worker/open-data-worker-server/pom.xml new file mode 100644 index 0000000000..289b030dc7 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/pom.xml @@ -0,0 +1,258 @@ + + + + open-data-worker + com.epmet + 2.0.0 + + 4.0.0 + + open-data-worker-server + + + + com.epmet + epmet-commons-tools + 2.0.0 + + + com.epmet + epmet-commons-mybatis + 2.0.0 + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework + spring-context-support + + + org.springframework.boot + spring-boot-starter-actuator + + + de.codecentric + spring-boot-admin-starter-client + ${spring.boot.admin.version} + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + io.github.openfeign + feign-httpclient + 10.3.0 + + + + + com.epmet + epmet-commons-rocketmq + 2.0.0 + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + + + ${project.basedir}/src/main/java + + + true + ${basedir}/src/main/resources + + + + + + + dev + + 8117 + dev + + + + + + epmet_open_data_user + EpmEt-db-UsEr + + 0 + 192.168.1.140 + 6379 + 123456 + + true + 192.168.1.140:8848 + 1fecc730-5e6e-464c-aae9-7567944e7936 + + + false + + + true + + + false + + + https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 + SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.1.140:9876;192.168.1.141:9876 + + + + local + + true + + + 8117 + local + + + + + + epmet_open_data_user + EpmEt-db-UsEr + + 0 + 192.168.1.140 + 6379 + 123456 + + false + 192.168.1.140:8848 + 1fecc730-5e6e-464c-aae9-7567944e7936 + + + false + + + false + + + false + + + https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 + SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + false + 192.168.1.140:9876;192.168.1.141:9876 + + + + test + + + 8117 + test + + + + + + epmet + elink@833066 + + 0 + r-m5eoz5b6tkx09y6bpz.redis.rds.aliyuncs.com + 6379 + EpmEtrEdIs!q@w + + true + 192.168.10.150:8848 + 67e3c350-533e-4d7c-9f8f-faf1b4aa82ae + + + false + + + true + + + true + + + https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4 + SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd + + + true + 192.168.10.161:9876 + + + + prod + + 8117 + prod + + + + + + epmet_open_data_user + EpmEt-db-UsEr + + 0 + r-m5ez3n1j0qc3ykq2ut.redis.rds.aliyuncs.com + 6379 + EpmEtclOUdrEdIs!Q2w + + true + 192.168.11.180:8848 + bd205d23-e696-47be-b995-916313f86e99 + + + false + + + true + + + true + + + https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c + SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1 + + + true + 192.168.11.187:9876;192.168.11.184:9876 + + + + + \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java new file mode 100644 index 0000000000..bb8ff976ac --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java @@ -0,0 +1,26 @@ +package com.epmet.opendata; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.web.servlet.ServletComponentScan; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.cloud.openfeign.EnableFeignClients; +import org.springframework.context.annotation.ComponentScan; + +/** + * @Description OpenData服务启动类 + * 注意:此处要使用@ComponentScan的原因是,启动类处于包com.epmet.opendata下,想用com.epmet.commons包下的一些Spring对象,需要 + * 手动指定扫描包,否则无法扫描 + * @author wxz + * @date 2021.10.13 15:16:05 +*/ +@SpringBootApplication +@EnableDiscoveryClient +@EnableFeignClients +@ServletComponentScan +@ComponentScan(value = { "com.epmet.opendata", "com.epmet.commons" }) +public class OpenDataApplication { + public static void main(String[] args) { + SpringApplication.run(OpenDataApplication.class, args); + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java new file mode 100644 index 0000000000..a25220cbfb --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java @@ -0,0 +1,161 @@ +package com.epmet.opendata.config; + +import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.ListView; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.netflix.loadbalancer.DynamicServerListLoadBalancer; +import com.netflix.loadbalancer.ILoadBalancer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.netflix.ribbon.SpringClientFactory; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * @author wxz + * @Description Nacos服务列表刷新监听注册器 + * @date 2021.09.22 14:33:11 + */ +@Slf4j +@Configuration +@ConditionalOnProperty(prefix = "spring.cloud.nacos.discovery.serviceListChangedListening", name = "enable", havingValue = "true", matchIfMissing = false) +public class NacosServiceListListenerRegisterer { + + public static final String REFRESH_SERVER_LIST_METHOD_NAME = "restOfInit"; + // 服务列表拉取间隔:10s + public static final long SERVICE_LIST_PULLING_DELAY_SECONDS = 10; + + private NamingService namingService; + + private ScheduledExecutorService executor; + + @Autowired + private NacosDiscoveryProperties discoveryProperties; + + @Autowired + private SpringClientFactory springClientFactory; + + // 监听中的服务列表 + private List observingServers = new ArrayList<>(33); + + @PostConstruct + public void init() { + namingService = discoveryProperties.namingServiceInstance(); + // 启动监听 + executor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("NacosServiceListWatchingRegisterer"); + return thread; + } + }); + + // 立即启动,并15s刷新一次服务列表,用于新服务列表的发现 + ScheduledFuture future = executor.scheduleAtFixedRate(new EpmetNacosServiceListListener(), 0, SERVICE_LIST_PULLING_DELAY_SECONDS, TimeUnit.SECONDS); + } + + public class EpmetNacosServiceListListener implements Runnable { + + @Override + public void run() { + doRefreshServerList(); + } + + /** + * @param + * @return + * @description 执行刷新 + * @author wxz + * @date 2021.09.22 16:04:49 + */ + private synchronized void doRefreshServerList() { + ListView serviceListView = null; + try { + serviceListView = namingService.getServicesOfServer(1, 100); + //启动监听 + if (serviceListView == null || serviceListView.getCount() == 0) { + log.info("【Nacos服务列表定时刷新】当前无任何可添加监听的服务"); + return; + } + List serviceList = serviceListView.getData(); + log.info("【Nacos服务列表定时刷新】Nacos服务端服务列表: {}", serviceList); + + for (String service : serviceList) { + try { + + // 如果该服务已经在监听列表中存在了,则不再注册监听。注:不能取消空服务的监听,因为空服务随时可能恢复运行,需要实时监听。 + if (observingServers.contains(service)) { + continue; + } + + namingService.subscribe(service, new EventListener() { + @Override + public void onEvent(Event event) { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + log.info("【Nacos服务列表刷新监听】收到事件:{}:[{}]", namingEvent.getServiceName(), namingEvent.getInstances()); + doRefreshServerList(service); + } + } + }); + + // 将该服务加入到监听列表中 + observingServers.add(service); + } catch (NacosException e) { + String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); + log.error("【Nacos服务列表定时刷新】订阅ApplicationContext的刷新事件失败,错误信息:{}", errorStackTrace); + } + } + + } catch (NacosException e) { + String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); + log.error("【Nacos服务列表定时刷新】链接Nacos服务端失败,错误信息:{}", errorStackTrace); + } + } + + /** + * @param serviceName + * @return + * @description 刷新ServerList + * @author wxz + * @date 2021.09.22 09:29:16 + */ + private void doRefreshServerList(String serviceName) { + // 刷新方式1:反射调用DynamicServerListLoadBalancer中的restOfInit()方法。该方法原来只执行一次,此处不推荐用 + //ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName); + //if (loadBalancer instanceof ZoneAwareLoadBalancer) { + // ZoneAwareLoadBalancer zaLoadBalancer = (ZoneAwareLoadBalancer) loadBalancer; + // IClientConfig clientConfig = springClientFactory.getClientConfig(serviceName); + // try { + // Method restOfInitMethod = zaLoadBalancer.getClass().getSuperclass().getDeclaredMethod(REFRESH_SERVER_LIST_METHOD_NAME, IClientConfig.class); + // restOfInitMethod.setAccessible(true); + // restOfInitMethod.invoke(zaLoadBalancer, clientConfig); + // } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); + // log.error("【LoadBalancer刷新服务列表】失败:{}", errorStackTrace); + // } + //} + + // 刷新方式2:DynamicServerListLoadBalancer#updateListOfServers()该方法为ribbon定时刷新服务列表的时候真正调用的方法,但是加了@VisibleForTesting + // 暂且 1 try + ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName); + if (loadBalancer instanceof DynamicServerListLoadBalancer) { + DynamicServerListLoadBalancer dslb = (DynamicServerListLoadBalancer) loadBalancer; + dslb.updateListOfServers(); + } + } + } + +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java new file mode 100644 index 0000000000..ede4eeb83a --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java @@ -0,0 +1,39 @@ +package com.epmet.opendata.mq; + +import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants; +import com.epmet.commons.rocketmq.constants.TopicConstants; +import com.epmet.commons.rocketmq.register.MQAbstractRegister; +import com.epmet.commons.rocketmq.register.MQConsumerProperties; +import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener; +import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.springframework.stereotype.Component; + +/** + * @Description 如果rocketmq.enable=true,这里必须实现,且 实例化 + * @author wxz + * @date 2021.07.14 17:13:41 +*/ +@Component +public class RocketMQConsumerRegister extends MQAbstractRegister { + + @Override + public void registerAllListeners(String env, MQConsumerProperties consumerProperties) { + // 客户初始化监听器注册 + register(consumerProperties, + ConsomerGroupConstants.OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP, + MessageModel.CLUSTERING, + TopicConstants.ORG, + "*", + new OpenDataOrgChangeEventListener()); + + register(consumerProperties, + ConsomerGroupConstants.OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP, + MessageModel.CLUSTERING, + TopicConstants.STAFF, + "*", + new OpenDataStaffChangeEventListener()); + + // ...其他监听器类似 + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java new file mode 100644 index 0000000000..17cc9eca72 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java @@ -0,0 +1,61 @@ +package com.epmet.opendata.mq.listener; + +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @Description 系统对接中间库,组织信息变更监听器 + * @author wxz + * @date 2021.10.13 15:21:48 +*/ +public class OpenDataOrgChangeEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String tags = messageExt.getTags(); + + logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + //distributedLock = SpringContextUtils.getBean(DistributedLock.class); + //lock = distributedLock.getLock(String.format("lock:open_data_org:%s", orgId), + // 30L, 30L, TimeUnit.SECONDS); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【开放数据事件监听器】-组织信息变更-同步信息到中间库失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【开放数据事件监听器】-组织信息变更-同步信息到中间库失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + //distributedLock.unLock(lock); + } + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java new file mode 100644 index 0000000000..b8f0be40bd --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java @@ -0,0 +1,61 @@ +package com.epmet.opendata.mq.listener; + +import com.epmet.commons.tools.distributedlock.DistributedLock; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.epmet.commons.tools.exception.RenException; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.redisson.api.RLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * @Description 系统对接中间库,工作人员信息变更监听器 + * @author wxz + * @date 2021.10.13 15:21:48 +*/ +public class OpenDataStaffChangeEventListener implements MessageListenerConcurrently { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + try { + msgs.forEach(msg -> consumeMessage(msg)); + } catch (Exception e) { + logger.error(ExceptionUtils.getErrorStackTrace(e)); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + private void consumeMessage(MessageExt messageExt) { + // msg即为消息体 + // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可 + String msg = new String(messageExt.getBody()); + String tags = messageExt.getTags(); + + logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}", msg, tags); + + DistributedLock distributedLock = null; + RLock lock = null; + try { + //distributedLock = SpringContextUtils.getBean(DistributedLock.class); + //lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId), + // 30L, 30L, TimeUnit.SECONDS); + } catch (RenException e) { + // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试 + logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + } catch (Exception e) { + // 不是我们自己抛出的异常,可以让MQ重试 + logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e))); + throw e; + } finally { + //distributedLock.unLock(lock); + } + } +} diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml new file mode 100644 index 0000000000..59302e27eb --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml @@ -0,0 +1,138 @@ +server: + port: @server.port@ + version: @version@ + servlet: + context-path: /opendata + +spring: + main: + allow-bean-definition-overriding: true + application: + name: open-data-server + #环境 dev|test|prod + profiles: + active: @spring.profiles.active@ + jackson: + time-zone: GMT+8 + date-format: yyyy-MM-dd HH:mm:ss + redis: + database: @spring.redis.index@ + host: @spring.redis.host@ + port: @spring.redis.port@ + password: @spring.redis.password@ + timeout: 30s + datasource: + druid: + #MySQL + driver-class-name: com.mysql.cj.jdbc.Driver + url: @spring.datasource.druid.url@ + username: @spring.datasource.druid.username@ + password: @spring.datasource.druid.password@ + cloud: + nacos: + discovery: + server-addr: @nacos.server-addr@ + #nacos的命名空间ID,默认是public + namespace: @nacos.discovery.namespace@ + #不把自己注册到注册中心的地址 + register-enabled: @nacos.register-enabled@ + ip: @nacos.ip@ + serviceListChangedListening: + enable: @nacos.service-list-changed-listening.enable@ + config: + enabled: @nacos.config-enabled@ + server-addr: @nacos.server-addr@ + namespace: @nacos.config.namespace@ + group: @nacos.config.group@ + file-extension: yaml + #指定共享配置,且支持动态刷新 +# ext-config: +# - data-id: datasource.yaml +# group: ${spring.cloud.nacos.config.group} +# refresh: true +# - data-id: common.yaml +# group: ${spring.cloud.nacos.config.group} +# refresh: true + + # 数据迁移工具flyway + flyway: + enabled: @spring.flyway.enabled@ + locations: classpath:db/migration + url: @spring.datasource.druid.url@ + user: @spring.datasource.druid.username@ + password: @spring.datasource.druid.password@ + baseline-on-migrate: true + baseline-version: 0 + + +management: + endpoints: + web: + exposure: + include: "*" + endpoint: + health: + show-details: ALWAYS + +mybatis-plus: + mapper-locations: classpath:/mapper/**/*.xml + #实体扫描,多个package用逗号或者分号分隔 + typeAliasesPackage: com.epmet.entity + global-config: + #数据库相关配置 + db-config: + #主键类型 AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID"; + id-type: ID_WORKER + #字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断" + field-strategy: NOT_NULL + #驼峰下划线转换 + column-underline: true + banner: false + #原生配置 + configuration: + map-underscore-to-camel-case: true + cache-enabled: false + call-setters-on-nulls: true + jdbc-type-for-null: 'null' + +feign: + hystrix: + enabled: true + client: + config: + default: + loggerLevel: BASIC + okhttp: + enabled: true + +hystrix: + command: + default: + execution: + isolation: + thread: + timeoutInMilliseconds: 60000 #缺省为1000 + +ribbon: + ReadTimeout: 300000 + ConnectTimeout: 300000 + +#pageHelper分页插件 +pagehelper: + helper-dialect: mysql + reasonable: false #分页合理化配置,例如输入页码为-1,则自动转化为最小页码1 + +#feign 日志需要该配置 +logging: + level: + com.epmet: debug + +dingTalk: + robot: + webHook: @dingTalk.robot.webHook@ + secret: @dingTalk.robot.secret@ + +rocketmq: + # 是否开启mq + enable: @rocketmq.enable@ + name-server: @rocketmq.nameserver@ \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql new file mode 100644 index 0000000000..ac7d9c109c --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql @@ -0,0 +1,14 @@ +create database epmet_open_data default character set utf8mb4 collate utf8mb4_general_ci; + +-- 开发环境的脚本 +CREATE USER epmet_open_data_user@'%' IDENTIFIED BY 'EpmEt-db-UsEr'; +GRANT ALL ON `epmet_open_data`.* TO 'epmet_open_data_user'@'%'; +flush privileges; + + + +-- 测试环境的脚本 + + + +-- 生产环境的数据 \ No newline at end of file diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml new file mode 100644 index 0000000000..bcbff37445 --- /dev/null +++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml @@ -0,0 +1,176 @@ + + + + + + + + + + + + + ${appname} + + + + + + + + + debug + + + ${CONSOLE_LOG_PATTERN} + + UTF-8 + + + + + + + + ${log.path}/debug.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + + ${log.path}/debug-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 15 + + + + debug + ACCEPT + DENY + + + + + + + ${log.path}/info.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + + ${log.path}/info-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 15 + + + + info + ACCEPT + DENY + + + + + + + ${log.path}/warn.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + ${log.path}/warn-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 15 + + + + warn + ACCEPT + DENY + + + + + + + ${log.path}/error.log + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n + UTF-8 + + + + ${log.path}/error-%d{yyyy-MM-dd}.%i.log + + 100MB + + + 15 + + + + ERROR + ACCEPT + DENY + ${webHook} + ${secret} + ${appname} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/epmet-module/open-data-worker/pom.xml b/epmet-module/open-data-worker/pom.xml new file mode 100644 index 0000000000..97fe762b08 --- /dev/null +++ b/epmet-module/open-data-worker/pom.xml @@ -0,0 +1,18 @@ + + + + epmet-module + com.epmet + 2.0.0 + + 4.0.0 + + open-data-worker + pom + + open-data-worker-client + open-data-worker-server + + \ No newline at end of file diff --git a/epmet-module/pom.xml b/epmet-module/pom.xml index 56b1452eec..8b9efdfdb8 100644 --- a/epmet-module/pom.xml +++ b/epmet-module/pom.xml @@ -44,6 +44,7 @@ epmet-point epmet-ext data-aggregator - + open-data-worker +