|
|
@ -1,6 +1,6 @@ |
|
|
|
package com.epmet.commons.tools.feign; |
|
|
|
|
|
|
|
import com.epmet.commons.tools.constant.ThreadLocalConstant; |
|
|
|
import com.alibaba.ttl.TtlRunnable; |
|
|
|
import com.netflix.hystrix.HystrixThreadPoolKey; |
|
|
|
import com.netflix.hystrix.HystrixThreadPoolProperties; |
|
|
|
import com.netflix.hystrix.strategy.HystrixPlugins; |
|
|
@ -12,17 +12,15 @@ import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; |
|
|
|
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; |
|
|
|
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; |
|
|
|
import com.netflix.hystrix.strategy.properties.HystrixProperty; |
|
|
|
import com.netflix.hystrix.util.PlatformSpecific; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import org.springframework.web.context.request.RequestAttributes; |
|
|
|
import org.springframework.web.context.request.RequestContextHolder; |
|
|
|
|
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.BlockingQueue; |
|
|
|
import java.util.concurrent.Callable; |
|
|
|
import java.util.concurrent.ThreadPoolExecutor; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.*; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
|
|
|
|
/** |
|
|
|
* 自定义Feign的隔离策略 |
|
|
@ -37,7 +35,6 @@ import java.util.concurrent.TimeUnit; |
|
|
|
@Component |
|
|
|
public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { |
|
|
|
|
|
|
|
public static final ThreadLocal<Map<String, String>> hystrixAdditionHeaders = new ThreadLocal<>(); |
|
|
|
private static final Logger log = LoggerFactory.getLogger(FeignHystrixConcurrencyStrategy.class); |
|
|
|
private HystrixConcurrencyStrategy delegate; |
|
|
|
|
|
|
@ -77,23 +74,57 @@ public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy |
|
|
|
|
|
|
|
@Override |
|
|
|
public <T> Callable<T> wrapCallable(Callable<T> callable) { |
|
|
|
Map<String, String> inheritableAdditionalHeaders = ThreadLocalConstant.inheritableAdditionalHeaders.get(); |
|
|
|
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); |
|
|
|
return new WrappedCallable<>(callable, requestAttributes, inheritableAdditionalHeaders); |
|
|
|
return new WrappedCallable<>(callable, requestAttributes); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, |
|
|
|
HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, |
|
|
|
HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { |
|
|
|
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, |
|
|
|
unit, workQueue); |
|
|
|
// return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
|
|
|
|
|
|
|
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); |
|
|
|
|
|
|
|
final int dynamicCoreSize = corePoolSize.get(); |
|
|
|
final int dynamicMaximumSize = maximumPoolSize.get(); |
|
|
|
|
|
|
|
if (dynamicCoreSize > dynamicMaximumSize) { |
|
|
|
log.error("Epmet Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + |
|
|
|
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + |
|
|
|
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); |
|
|
|
return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); |
|
|
|
} else { |
|
|
|
return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, |
|
|
|
HystrixThreadPoolProperties threadPoolProperties) { |
|
|
|
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties); |
|
|
|
// return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
|
|
|
|
|
|
|
|
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); |
|
|
|
|
|
|
|
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); |
|
|
|
final int dynamicCoreSize = threadPoolProperties.coreSize().get(); |
|
|
|
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); |
|
|
|
final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); |
|
|
|
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize); |
|
|
|
|
|
|
|
if (allowMaximumSizeToDivergeFromCoreSize) { |
|
|
|
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); |
|
|
|
if (dynamicCoreSize > dynamicMaximumSize) { |
|
|
|
log.error("Epmet Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + |
|
|
|
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + |
|
|
|
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); |
|
|
|
return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); |
|
|
|
} else { |
|
|
|
return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); |
|
|
|
} |
|
|
|
} else { |
|
|
|
return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
@ -106,21 +137,41 @@ public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy |
|
|
|
return this.delegate.getRequestVariable(rv); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 从父类复制过来的 |
|
|
|
* @param threadPoolKey |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
private ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { |
|
|
|
if (!PlatformSpecific.isAppEngineStandardEnvironment()) { |
|
|
|
return new ThreadFactory() { |
|
|
|
private final AtomicInteger threadNumber = new AtomicInteger(0); |
|
|
|
|
|
|
|
@Override |
|
|
|
public Thread newThread(Runnable r) { |
|
|
|
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); |
|
|
|
thread.setDaemon(true); |
|
|
|
return thread; |
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
} else { |
|
|
|
return PlatformSpecific.getAppEngineThreadFactory(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static class WrappedCallable<T> implements Callable<T> { |
|
|
|
private final Callable<T> target; |
|
|
|
private final RequestAttributes requestAttributes; |
|
|
|
private final Map<String, String> inheritableAdditionalHeaders; |
|
|
|
|
|
|
|
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes, Map<String, String> inheritableAdditionalHeaders) { |
|
|
|
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) { |
|
|
|
this.target = target; |
|
|
|
this.requestAttributes = requestAttributes; |
|
|
|
this.inheritableAdditionalHeaders = inheritableAdditionalHeaders; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public T call() throws Exception { |
|
|
|
try { |
|
|
|
hystrixAdditionHeaders.set(inheritableAdditionalHeaders); |
|
|
|
RequestContextHolder.setRequestAttributes(requestAttributes); |
|
|
|
return target.call(); |
|
|
|
} finally { |
|
|
@ -128,4 +179,25 @@ public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 自定义线程池类,使用Ttl包装过,解决异步环境下线程上下文透传问题 |
|
|
|
*/ |
|
|
|
static class WrappedThreadPoolExecutor extends ThreadPoolExecutor { |
|
|
|
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); |
|
|
|
|
|
|
|
public WrappedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, |
|
|
|
BlockingQueue<Runnable> workQueue) { |
|
|
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); |
|
|
|
} |
|
|
|
public WrappedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, |
|
|
|
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { |
|
|
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void execute(Runnable command) { |
|
|
|
super.execute(TtlRunnable.get(command)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|