|  |  | @ -1,6 +1,6 @@ | 
			
		
	
		
			
				
					|  |  |  | package com.epmet.commons.tools.feign; | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import com.epmet.commons.tools.constant.ThreadLocalConstant; | 
			
		
	
		
			
				
					|  |  |  | import com.alibaba.ttl.TtlRunnable; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.HystrixThreadPoolKey; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.HystrixThreadPoolProperties; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.strategy.HystrixPlugins; | 
			
		
	
	
		
			
				
					|  |  | @ -12,17 +12,15 @@ import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.strategy.properties.HystrixProperty; | 
			
		
	
		
			
				
					|  |  |  | import com.netflix.hystrix.util.PlatformSpecific; | 
			
		
	
		
			
				
					|  |  |  | import org.slf4j.Logger; | 
			
		
	
		
			
				
					|  |  |  | import org.slf4j.LoggerFactory; | 
			
		
	
		
			
				
					|  |  |  | import org.springframework.stereotype.Component; | 
			
		
	
		
			
				
					|  |  |  | import org.springframework.web.context.request.RequestAttributes; | 
			
		
	
		
			
				
					|  |  |  | import org.springframework.web.context.request.RequestContextHolder; | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | import java.util.Map; | 
			
		
	
		
			
				
					|  |  |  | import java.util.concurrent.BlockingQueue; | 
			
		
	
		
			
				
					|  |  |  | import java.util.concurrent.Callable; | 
			
		
	
		
			
				
					|  |  |  | import java.util.concurrent.ThreadPoolExecutor; | 
			
		
	
		
			
				
					|  |  |  | import java.util.concurrent.TimeUnit; | 
			
		
	
		
			
				
					|  |  |  | import java.util.concurrent.*; | 
			
		
	
		
			
				
					|  |  |  | import java.util.concurrent.atomic.AtomicInteger; | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  | /** | 
			
		
	
		
			
				
					|  |  |  |  * 自定义Feign的隔离策略 | 
			
		
	
	
		
			
				
					|  |  | @ -37,7 +35,6 @@ import java.util.concurrent.TimeUnit; | 
			
		
	
		
			
				
					|  |  |  | @Component | 
			
		
	
		
			
				
					|  |  |  | public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     public static final ThreadLocal<Map<String, String>> hystrixAdditionHeaders = new ThreadLocal<>(); | 
			
		
	
		
			
				
					|  |  |  |     private static final Logger log = LoggerFactory.getLogger(FeignHystrixConcurrencyStrategy.class); | 
			
		
	
		
			
				
					|  |  |  |     private HystrixConcurrencyStrategy delegate; | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  | @ -77,23 +74,57 @@ public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     @Override | 
			
		
	
		
			
				
					|  |  |  |     public <T> Callable<T> wrapCallable(Callable<T> callable) { | 
			
		
	
		
			
				
					|  |  |  |         Map<String, String> inheritableAdditionalHeaders = ThreadLocalConstant.inheritableAdditionalHeaders.get(); | 
			
		
	
		
			
				
					|  |  |  |         RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); | 
			
		
	
		
			
				
					|  |  |  |         return new WrappedCallable<>(callable, requestAttributes, inheritableAdditionalHeaders); | 
			
		
	
		
			
				
					|  |  |  |         return new WrappedCallable<>(callable, requestAttributes); | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     @Override | 
			
		
	
		
			
				
					|  |  |  |     public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, | 
			
		
	
		
			
				
					|  |  |  |                                             HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, | 
			
		
	
		
			
				
					|  |  |  |                                             HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { | 
			
		
	
		
			
				
					|  |  |  |         return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, | 
			
		
	
		
			
				
					|  |  |  |                 unit, workQueue); | 
			
		
	
		
			
				
					|  |  |  | //        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         final int dynamicCoreSize = corePoolSize.get(); | 
			
		
	
		
			
				
					|  |  |  |         final int dynamicMaximumSize = maximumPoolSize.get(); | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         if (dynamicCoreSize > dynamicMaximumSize) { | 
			
		
	
		
			
				
					|  |  |  |             log.error("Epmet Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + | 
			
		
	
		
			
				
					|  |  |  |                     dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " + | 
			
		
	
		
			
				
					|  |  |  |                     dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); | 
			
		
	
		
			
				
					|  |  |  |             return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); | 
			
		
	
		
			
				
					|  |  |  |         } else { | 
			
		
	
		
			
				
					|  |  |  |             return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     @Override | 
			
		
	
		
			
				
					|  |  |  |     public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, | 
			
		
	
		
			
				
					|  |  |  |                                             HystrixThreadPoolProperties threadPoolProperties) { | 
			
		
	
		
			
				
					|  |  |  |         return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties); | 
			
		
	
		
			
				
					|  |  |  | //        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
 | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); | 
			
		
	
		
			
				
					|  |  |  |         final int dynamicCoreSize = threadPoolProperties.coreSize().get(); | 
			
		
	
		
			
				
					|  |  |  |         final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); | 
			
		
	
		
			
				
					|  |  |  |         final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); | 
			
		
	
		
			
				
					|  |  |  |         final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize); | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         if (allowMaximumSizeToDivergeFromCoreSize) { | 
			
		
	
		
			
				
					|  |  |  |             final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); | 
			
		
	
		
			
				
					|  |  |  |             if (dynamicCoreSize > dynamicMaximumSize) { | 
			
		
	
		
			
				
					|  |  |  |                 log.error("Epmet Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + | 
			
		
	
		
			
				
					|  |  |  |                         dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " + | 
			
		
	
		
			
				
					|  |  |  |                         dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); | 
			
		
	
		
			
				
					|  |  |  |                 return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); | 
			
		
	
		
			
				
					|  |  |  |             } else { | 
			
		
	
		
			
				
					|  |  |  |                 return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); | 
			
		
	
		
			
				
					|  |  |  |             } | 
			
		
	
		
			
				
					|  |  |  |         } else { | 
			
		
	
		
			
				
					|  |  |  |             return new WrappedThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     @Override | 
			
		
	
	
		
			
				
					|  |  | @ -106,21 +137,41 @@ public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy | 
			
		
	
		
			
				
					|  |  |  |         return this.delegate.getRequestVariable(rv); | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     /** | 
			
		
	
		
			
				
					|  |  |  |      * 从父类复制过来的 | 
			
		
	
		
			
				
					|  |  |  |      * @param threadPoolKey | 
			
		
	
		
			
				
					|  |  |  |      * @return | 
			
		
	
		
			
				
					|  |  |  |      */ | 
			
		
	
		
			
				
					|  |  |  |     private ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { | 
			
		
	
		
			
				
					|  |  |  |         if (!PlatformSpecific.isAppEngineStandardEnvironment()) { | 
			
		
	
		
			
				
					|  |  |  |             return new ThreadFactory() { | 
			
		
	
		
			
				
					|  |  |  |                 private final AtomicInteger threadNumber = new AtomicInteger(0); | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |                 @Override | 
			
		
	
		
			
				
					|  |  |  |                 public Thread newThread(Runnable r) { | 
			
		
	
		
			
				
					|  |  |  |                     Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); | 
			
		
	
		
			
				
					|  |  |  |                     thread.setDaemon(true); | 
			
		
	
		
			
				
					|  |  |  |                     return thread; | 
			
		
	
		
			
				
					|  |  |  |                 } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |             }; | 
			
		
	
		
			
				
					|  |  |  |         } else { | 
			
		
	
		
			
				
					|  |  |  |             return PlatformSpecific.getAppEngineThreadFactory(); | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     static class WrappedCallable<T> implements Callable<T> { | 
			
		
	
		
			
				
					|  |  |  |         private final Callable<T> target; | 
			
		
	
		
			
				
					|  |  |  |         private final RequestAttributes requestAttributes; | 
			
		
	
		
			
				
					|  |  |  |         private final Map<String, String> inheritableAdditionalHeaders; | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes, Map<String, String> inheritableAdditionalHeaders) { | 
			
		
	
		
			
				
					|  |  |  |         public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) { | 
			
		
	
		
			
				
					|  |  |  |             this.target = target; | 
			
		
	
		
			
				
					|  |  |  |             this.requestAttributes = requestAttributes; | 
			
		
	
		
			
				
					|  |  |  |             this.inheritableAdditionalHeaders = inheritableAdditionalHeaders; | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         @Override | 
			
		
	
		
			
				
					|  |  |  |         public T call() throws Exception { | 
			
		
	
		
			
				
					|  |  |  |             try { | 
			
		
	
		
			
				
					|  |  |  |                 hystrixAdditionHeaders.set(inheritableAdditionalHeaders); | 
			
		
	
		
			
				
					|  |  |  |                 RequestContextHolder.setRequestAttributes(requestAttributes); | 
			
		
	
		
			
				
					|  |  |  |                 return target.call(); | 
			
		
	
		
			
				
					|  |  |  |             } finally { | 
			
		
	
	
		
			
				
					|  |  | @ -128,4 +179,25 @@ public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy | 
			
		
	
		
			
				
					|  |  |  |             } | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |     /** | 
			
		
	
		
			
				
					|  |  |  |      * 自定义线程池类,使用Ttl包装过,解决异步环境下线程上下文透传问题 | 
			
		
	
		
			
				
					|  |  |  |      */ | 
			
		
	
		
			
				
					|  |  |  |     static class WrappedThreadPoolExecutor extends ThreadPoolExecutor { | 
			
		
	
		
			
				
					|  |  |  |         private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         public WrappedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, | 
			
		
	
		
			
				
					|  |  |  |                                              BlockingQueue<Runnable> workQueue) { | 
			
		
	
		
			
				
					|  |  |  |             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |         public WrappedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, | 
			
		
	
		
			
				
					|  |  |  |                                              BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { | 
			
		
	
		
			
				
					|  |  |  |             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |         @Override | 
			
		
	
		
			
				
					|  |  |  |         public void execute(Runnable command) { | 
			
		
	
		
			
				
					|  |  |  |             super.execute(TtlRunnable.get(command)); | 
			
		
	
		
			
				
					|  |  |  |         } | 
			
		
	
		
			
				
					|  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  | } | 
			
		
	
	
		
			
				
					|  |  | 
 |