forked from rongchao/epmet-cloud-rizhao
3 changed files with 313 additions and 0 deletions
@ -0,0 +1,155 @@ |
|||||
|
package com.epmet.config; |
||||
|
|
||||
|
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; |
||||
|
import com.alibaba.nacos.api.exception.NacosException; |
||||
|
import com.alibaba.nacos.api.naming.NamingService; |
||||
|
import com.alibaba.nacos.api.naming.listener.Event; |
||||
|
import com.alibaba.nacos.api.naming.listener.EventListener; |
||||
|
import com.alibaba.nacos.api.naming.listener.NamingEvent; |
||||
|
import com.alibaba.nacos.api.naming.pojo.ListView; |
||||
|
import com.epmet.commons.tools.exception.ExceptionUtils; |
||||
|
import com.netflix.client.config.IClientConfig; |
||||
|
import com.netflix.loadbalancer.ILoadBalancer; |
||||
|
import com.netflix.loadbalancer.ZoneAwareLoadBalancer; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
||||
|
import org.springframework.cloud.netflix.ribbon.SpringClientFactory; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
import javax.annotation.PostConstruct; |
||||
|
import java.lang.reflect.InvocationTargetException; |
||||
|
import java.lang.reflect.Method; |
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
import java.util.concurrent.*; |
||||
|
|
||||
|
/** |
||||
|
* @author wxz |
||||
|
* @Description Nacos服务列表刷新监听注册器 |
||||
|
* @date 2021.09.22 14:33:11 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Configuration |
||||
|
@ConditionalOnProperty(prefix = "spring.cloud.nacos.discovery.serviceInstanceChangedListening", name = "enable", havingValue = "true", matchIfMissing = false) |
||||
|
public class NacosServiceListListenerRegisterer { |
||||
|
|
||||
|
public static final String REFRESH_SERVER_LIST_METHOD_NAME = "restOfInit"; |
||||
|
// 服务列表拉取间隔:10s
|
||||
|
public static final long SERVICE_LIST_PULLING_DELAY_SECONDS = 10; |
||||
|
|
||||
|
private NamingService namingService; |
||||
|
|
||||
|
private ScheduledExecutorService executor; |
||||
|
|
||||
|
@Autowired |
||||
|
private NacosDiscoveryProperties discoveryProperties; |
||||
|
|
||||
|
@Autowired |
||||
|
private SpringClientFactory springClientFactory; |
||||
|
|
||||
|
// 监听中的服务列表
|
||||
|
private List<String> observingServers = new ArrayList<>(33); |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void init() { |
||||
|
namingService = discoveryProperties.namingServiceInstance(); |
||||
|
// 启动监听
|
||||
|
executor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { |
||||
|
@Override |
||||
|
public Thread newThread(Runnable r) { |
||||
|
Thread thread = new Thread(r); |
||||
|
thread.setDaemon(true); |
||||
|
thread.setName("NacosServiceListWatchingRegisterer"); |
||||
|
return thread; |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// 立即启动,并15s刷新一次服务列表,用于新服务列表的发现
|
||||
|
ScheduledFuture<?> future = executor.scheduleAtFixedRate(new EpmetNacosServiceListListener(), 0, SERVICE_LIST_PULLING_DELAY_SECONDS, TimeUnit.SECONDS); |
||||
|
} |
||||
|
|
||||
|
public class EpmetNacosServiceListListener implements Runnable { |
||||
|
|
||||
|
@Override |
||||
|
public void run() { |
||||
|
doRefreshServerList(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* @param |
||||
|
* @return |
||||
|
* @description 执行刷新 |
||||
|
* @author wxz |
||||
|
* @date 2021.09.22 16:04:49 |
||||
|
*/ |
||||
|
private synchronized void doRefreshServerList() { |
||||
|
ListView<String> serviceListView = null; |
||||
|
try { |
||||
|
serviceListView = namingService.getServicesOfServer(1, 100); |
||||
|
//启动监听
|
||||
|
if (serviceListView == null || serviceListView.getCount() == 0) { |
||||
|
log.info("【Nacos服务列表定时刷新】当前无任何可添加监听的服务"); |
||||
|
return; |
||||
|
} |
||||
|
List<String> serviceList = serviceListView.getData(); |
||||
|
log.info("【Nacos服务列表定时刷新】Nacos服务端服务列表: {}", serviceList); |
||||
|
|
||||
|
for (String service : serviceList) { |
||||
|
try { |
||||
|
|
||||
|
// 如果该服务已经在监听列表中存在了,则不再注册监听。注:不能取消空服务的监听,因为空服务随时可能恢复运行,需要实时监听。
|
||||
|
if (observingServers.contains(service)) { |
||||
|
continue; |
||||
|
} |
||||
|
|
||||
|
namingService.subscribe(service, new EventListener() { |
||||
|
@Override |
||||
|
public void onEvent(Event event) { |
||||
|
if (event instanceof NamingEvent) { |
||||
|
NamingEvent namingEvent = (NamingEvent) event; |
||||
|
log.info("【Nacos服务列表刷新监听】收到事件:{}:[{}]", namingEvent.getServiceName(), namingEvent.getInstances()); |
||||
|
doRefreshServerList(service); |
||||
|
} |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// 将该服务加入到监听列表中
|
||||
|
observingServers.add(service); |
||||
|
} catch (NacosException e) { |
||||
|
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |
||||
|
log.error("【Nacos服务列表定时刷新】订阅ApplicationContext的刷新事件失败,错误信息:{}", errorStackTrace); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} catch (NacosException e) { |
||||
|
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |
||||
|
log.error("【Nacos服务列表定时刷新】链接Nacos服务端失败,错误信息:{}", errorStackTrace); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* @param serviceName |
||||
|
* @return |
||||
|
* @description 刷新ServerList |
||||
|
* @author wxz |
||||
|
* @date 2021.09.22 09:29:16 |
||||
|
*/ |
||||
|
private void doRefreshServerList(String serviceName) { |
||||
|
ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName); |
||||
|
if (loadBalancer instanceof ZoneAwareLoadBalancer) { |
||||
|
ZoneAwareLoadBalancer zaLoadBalancer = (ZoneAwareLoadBalancer) loadBalancer; |
||||
|
IClientConfig clientConfig = springClientFactory.getClientConfig(serviceName); |
||||
|
try { |
||||
|
Method restOfInitMethod = zaLoadBalancer.getClass().getSuperclass().getDeclaredMethod(REFRESH_SERVER_LIST_METHOD_NAME, IClientConfig.class); |
||||
|
restOfInitMethod.setAccessible(true); |
||||
|
restOfInitMethod.invoke(zaLoadBalancer, clientConfig); |
||||
|
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { |
||||
|
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |
||||
|
log.error("【LoadBalancer刷新服务列表】失败:{}", errorStackTrace); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,155 @@ |
|||||
|
package com.epmet.config; |
||||
|
|
||||
|
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; |
||||
|
import com.alibaba.nacos.api.exception.NacosException; |
||||
|
import com.alibaba.nacos.api.naming.NamingService; |
||||
|
import com.alibaba.nacos.api.naming.listener.Event; |
||||
|
import com.alibaba.nacos.api.naming.listener.EventListener; |
||||
|
import com.alibaba.nacos.api.naming.listener.NamingEvent; |
||||
|
import com.alibaba.nacos.api.naming.pojo.ListView; |
||||
|
import com.epmet.commons.tools.exception.ExceptionUtils; |
||||
|
import com.netflix.client.config.IClientConfig; |
||||
|
import com.netflix.loadbalancer.ILoadBalancer; |
||||
|
import com.netflix.loadbalancer.ZoneAwareLoadBalancer; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
||||
|
import org.springframework.cloud.netflix.ribbon.SpringClientFactory; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
import javax.annotation.PostConstruct; |
||||
|
import java.lang.reflect.InvocationTargetException; |
||||
|
import java.lang.reflect.Method; |
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
import java.util.concurrent.*; |
||||
|
|
||||
|
/** |
||||
|
* @author wxz |
||||
|
* @Description Nacos服务列表刷新监听注册器 |
||||
|
* @date 2021.09.22 14:33:11 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
@Configuration |
||||
|
@ConditionalOnProperty(prefix = "spring.cloud.nacos.discovery.serviceInstanceChangedListening", name = "enable", havingValue = "true", matchIfMissing = false) |
||||
|
public class NacosServiceListListenerRegisterer { |
||||
|
|
||||
|
public static final String REFRESH_SERVER_LIST_METHOD_NAME = "restOfInit"; |
||||
|
// 服务列表拉取间隔:10s
|
||||
|
public static final long SERVICE_LIST_PULLING_DELAY_SECONDS = 10; |
||||
|
|
||||
|
private NamingService namingService; |
||||
|
|
||||
|
private ScheduledExecutorService executor; |
||||
|
|
||||
|
@Autowired |
||||
|
private NacosDiscoveryProperties discoveryProperties; |
||||
|
|
||||
|
@Autowired |
||||
|
private SpringClientFactory springClientFactory; |
||||
|
|
||||
|
// 监听中的服务列表
|
||||
|
private List<String> observingServers = new ArrayList<>(33); |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void init() { |
||||
|
namingService = discoveryProperties.namingServiceInstance(); |
||||
|
// 启动监听
|
||||
|
executor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() { |
||||
|
@Override |
||||
|
public Thread newThread(Runnable r) { |
||||
|
Thread thread = new Thread(r); |
||||
|
thread.setDaemon(true); |
||||
|
thread.setName("NacosServiceListWatchingRegisterer"); |
||||
|
return thread; |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// 立即启动,并15s刷新一次服务列表,用于新服务列表的发现
|
||||
|
ScheduledFuture<?> future = executor.scheduleAtFixedRate(new EpmetNacosServiceListListener(), 0, SERVICE_LIST_PULLING_DELAY_SECONDS, TimeUnit.SECONDS); |
||||
|
} |
||||
|
|
||||
|
public class EpmetNacosServiceListListener implements Runnable { |
||||
|
|
||||
|
@Override |
||||
|
public void run() { |
||||
|
doRefreshServerList(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* @param |
||||
|
* @return |
||||
|
* @description 执行刷新 |
||||
|
* @author wxz |
||||
|
* @date 2021.09.22 16:04:49 |
||||
|
*/ |
||||
|
private synchronized void doRefreshServerList() { |
||||
|
ListView<String> serviceListView = null; |
||||
|
try { |
||||
|
serviceListView = namingService.getServicesOfServer(1, 100); |
||||
|
//启动监听
|
||||
|
if (serviceListView == null || serviceListView.getCount() == 0) { |
||||
|
log.info("【Nacos服务列表定时刷新】当前无任何可添加监听的服务"); |
||||
|
return; |
||||
|
} |
||||
|
List<String> serviceList = serviceListView.getData(); |
||||
|
log.info("【Nacos服务列表定时刷新】Nacos服务端服务列表: {}", serviceList); |
||||
|
|
||||
|
for (String service : serviceList) { |
||||
|
try { |
||||
|
|
||||
|
// 如果该服务已经在监听列表中存在了,则不再注册监听。注:不能取消空服务的监听,因为空服务随时可能恢复运行,需要实时监听。
|
||||
|
if (observingServers.contains(service)) { |
||||
|
continue; |
||||
|
} |
||||
|
|
||||
|
namingService.subscribe(service, new EventListener() { |
||||
|
@Override |
||||
|
public void onEvent(Event event) { |
||||
|
if (event instanceof NamingEvent) { |
||||
|
NamingEvent namingEvent = (NamingEvent) event; |
||||
|
log.info("【Nacos服务列表刷新监听】收到事件:{}:[{}]", namingEvent.getServiceName(), namingEvent.getInstances()); |
||||
|
doRefreshServerList(service); |
||||
|
} |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// 将该服务加入到监听列表中
|
||||
|
observingServers.add(service); |
||||
|
} catch (NacosException e) { |
||||
|
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |
||||
|
log.error("【Nacos服务列表定时刷新】订阅ApplicationContext的刷新事件失败,错误信息:{}", errorStackTrace); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} catch (NacosException e) { |
||||
|
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |
||||
|
log.error("【Nacos服务列表定时刷新】链接Nacos服务端失败,错误信息:{}", errorStackTrace); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* @param serviceName |
||||
|
* @return |
||||
|
* @description 刷新ServerList |
||||
|
* @author wxz |
||||
|
* @date 2021.09.22 09:29:16 |
||||
|
*/ |
||||
|
private void doRefreshServerList(String serviceName) { |
||||
|
ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName); |
||||
|
if (loadBalancer instanceof ZoneAwareLoadBalancer) { |
||||
|
ZoneAwareLoadBalancer zaLoadBalancer = (ZoneAwareLoadBalancer) loadBalancer; |
||||
|
IClientConfig clientConfig = springClientFactory.getClientConfig(serviceName); |
||||
|
try { |
||||
|
Method restOfInitMethod = zaLoadBalancer.getClass().getSuperclass().getDeclaredMethod(REFRESH_SERVER_LIST_METHOD_NAME, IClientConfig.class); |
||||
|
restOfInitMethod.setAccessible(true); |
||||
|
restOfInitMethod.invoke(zaLoadBalancer, clientConfig); |
||||
|
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { |
||||
|
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e); |
||||
|
log.error("【LoadBalancer刷新服务列表】失败:{}", errorStackTrace); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
Loading…
Reference in new issue