From 2f76f1fd15ce2788d071467b1a3e67b461412424 Mon Sep 17 00:00:00 2001 From: zhangyuan Date: Fri, 5 Aug 2022 13:32:20 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=95=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/epmet/config/ModuleConfigImpl.java | 24 +++ .../NacosServiceListListenerRegisterer.java | 161 ++++++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/ModuleConfigImpl.java create mode 100644 epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/NacosServiceListListenerRegisterer.java diff --git a/epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/ModuleConfigImpl.java b/epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/ModuleConfigImpl.java new file mode 100644 index 0000000..cfbfb4d --- /dev/null +++ b/epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/ModuleConfigImpl.java @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2018 人人开源 All rights reserved. + *

+ * https://www.renren.io + *

+ * 版权所有,侵权必究! + */ + +package com.epmet.config; + +import com.epmet.commons.tools.config.ModuleConfig; +import org.springframework.stereotype.Service; + +/** + * @author Mark sunlightcs@gmail.com + * @since 1.0.0 + */ +@Service +public class ModuleConfigImpl implements ModuleConfig { + @Override + public String getName() { + return "pli-power-base"; + } +} diff --git a/epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/NacosServiceListListenerRegisterer.java b/epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/NacosServiceListListenerRegisterer.java new file mode 100644 index 0000000..da054b5 --- /dev/null +++ b/epmet-plugins-module/pli-power-base/pli-power-base-server/src/main/java/com/epmet/config/NacosServiceListListenerRegisterer.java @@ -0,0 +1,161 @@ +package com.epmet.config; + +import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.ListView; +import com.epmet.commons.tools.exception.ExceptionUtils; +import com.netflix.loadbalancer.DynamicServerListLoadBalancer; +import com.netflix.loadbalancer.ILoadBalancer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.netflix.ribbon.SpringClientFactory; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * @author wxz + * @Description Nacos服务列表刷新监听注册器 + * @date 2021.09.22 14:33:11 + */ +@Slf4j +@Configuration +@ConditionalOnProperty(prefix = "spring.cloud.nacos.discovery.serviceListChangedListening", name = "enable", havingValue = "true", matchIfMissing = false) +public class NacosServiceListListenerRegisterer { + + public static final String REFRESH_SERVER_LIST_METHOD_NAME = "restOfInit"; + // 服务列表拉取间隔:10s + public static final long SERVICE_LIST_PULLING_DELAY_SECONDS = 10; + + private NamingService namingService; + + private ScheduledExecutorService executor; + + @Autowired + private NacosDiscoveryProperties discoveryProperties; + + @Autowired + private SpringClientFactory springClientFactory; + + // 监听中的服务列表 + private List observingServers = new ArrayList<>(33); + + @PostConstruct + public void init() { + namingService = discoveryProperties.namingServiceInstance(); + // 启动监听 + executor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("NacosServiceListWatchingRegisterer"); + return thread; + } + }); + + // 立即启动,并15s刷新一次服务列表,用于新服务列表的发现 + ScheduledFuture future = executor.scheduleAtFixedRate(new EpmetNacosServiceListListener(), 0, SERVICE_LIST_PULLING_DELAY_SECONDS, TimeUnit.SECONDS); + } + + public class EpmetNacosServiceListListener implements Runnable { + + @Override + public void run() { + doRefreshServerList(); + } + + /** + * @param + * @return + * @description 执行刷新 + * @author wxz + * @date 2021.09.22 16:04:49 + */ + private synchronized void doRefreshServerList() { + ListView serviceListView = null; + try { + serviceListView = namingService.getServicesOfServer(1, 100); + //启动监听 + if (serviceListView == null || serviceListView.getCount() == 0) { + // log.debug("【Nacos服务列表定时刷新】当前无任何可添加监听的服务"); + return; + } + List serviceList = serviceListView.getData(); + // log.debug("【Nacos服务列表定时刷新】Nacos服务端服务列表: {}", serviceList); + + for (String service : serviceList) { + try { + + // 如果该服务已经在监听列表中存在了,则不再注册监听。注:不能取消空服务的监听,因为空服务随时可能恢复运行,需要实时监听。 + if (observingServers.contains(service)) { + continue; + } + + namingService.subscribe(service, new EventListener() { + @Override + public void onEvent(Event event) { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + log.debug("【Nacos服务列表刷新监听】收到事件:{}:[{}]", namingEvent.getServiceName(), namingEvent.getInstances()); + doRefreshServerList(service); + } + } + }); + + // 将该服务加入到监听列表中 + observingServers.add(service); + } catch (NacosException e) { + String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); + log.error("【Nacos服务列表定时刷新】订阅ApplicationContext的刷新事件失败,错误信息:{}", errorStackTrace); + } + } + + } catch (NacosException e) { + String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); + log.error("【Nacos服务列表定时刷新】链接Nacos服务端失败,错误信息:{}", errorStackTrace); + } + } + + /** + * @param serviceName + * @return + * @description 刷新ServerList + * @author wxz + * @date 2021.09.22 09:29:16 + */ + private void doRefreshServerList(String serviceName) { + // 刷新方式1:反射调用DynamicServerListLoadBalancer中的restOfInit()方法。该方法原来只执行一次,此处不推荐用 + //ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName); + //if (loadBalancer instanceof ZoneAwareLoadBalancer) { + // ZoneAwareLoadBalancer zaLoadBalancer = (ZoneAwareLoadBalancer) loadBalancer; + // IClientConfig clientConfig = springClientFactory.getClientConfig(serviceName); + // try { + // Method restOfInitMethod = zaLoadBalancer.getClass().getSuperclass().getDeclaredMethod(REFRESH_SERVER_LIST_METHOD_NAME, IClientConfig.class); + // restOfInitMethod.setAccessible(true); + // restOfInitMethod.invoke(zaLoadBalancer, clientConfig); + // } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); + // log.error("【LoadBalancer刷新服务列表】失败:{}", errorStackTrace); + // } + //} + + // 刷新方式2:DynamicServerListLoadBalancer#updateListOfServers()该方法为ribbon定时刷新服务列表的时候真正调用的方法,但是加了@VisibleForTesting + // 暂且 1 try + ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName); + if (loadBalancer instanceof DynamicServerListLoadBalancer) { + DynamicServerListLoadBalancer dslb = (DynamicServerListLoadBalancer) loadBalancer; + dslb.updateListOfServers(); + } + } + } + +}