Hystrix线程隔离 在微服务框架中,可能一个服务需要调用多个微服务,在tomcat中运行时,tomcat只是分配了100个线程,由于多个服务之间调用的时间消耗过长,可能会导致线程耗尽,而在Hystrix中存在线程隔离,对于每个微服务分配一个线程池,访问某个微服务时就从对应的线程池中取线程,如果对应线程池中的线程都用光了,那么就认为该服务不可用了,如果在需要请求该微服务,则直接返回
那么这个线程池是存在于哪里呢?
既然Hystrix的Command都是在线程池中执行的,就会遇到当前的RequestContextHolder获取不到RequestAttributes,没办法,跨线程了呀(RequestContextHolder中使用了ThreadLocal),这怎么解决呢?Hystrix中提供了一个HystrixConcurrencyStrategy类,HystrixConcurrencyStrategy提供了一套默认的并发策略实现。我们可以根据我们自己不同需求通过装饰去扩展它。如每次执行HystrixCommand的时候都会去调用wrapCallable(Callable) 方法,这里我们就可以通过装饰Callable使它提供一些额外的功能(如ThreadLocal上下文传递)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private static final Logger log; private HystrixConcurrencyStrategy delegate; public FeignHystrixConcurrencyStrategy () { try { this .delegate = HystrixPlugins.getInstance().getConcurrencyStrategy(); if (this .delegate instanceof FeignHystrixConcurrencyStrategy) { return ; } HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook(); HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); HystrixPlugins.reset(); HystrixPlugins.getInstance().registerConcurrencyStrategy(this ); HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); } catch (Exception e) { log.error("Failed to register Sleuth Hystrix Concurrency Strategy" , e); } } public <T> Callable<T> wrapCallable ( Callable<T> callable) { RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); return new WrappedCallable<T>(callable, requestAttributes); } public ThreadPoolExecutor getThreadPool ( HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return this .delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ThreadPoolExecutor getThreadPool ( HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { return this .delegate.getThreadPool(threadPoolKey, threadPoolProperties); } public BlockingQueue<Runnable> getBlockingQueue ( int maxQueueSize) { return this .delegate.getBlockingQueue(maxQueueSize); } public <T> HystrixRequestVariable<T> getRequestVariable ( HystrixRequestVariableLifecycle<T> rv) { return this .delegate.getRequestVariable(rv); } class WrappedCallable <T > implements Callable <T > { private final Callable<T> target; private final RequestAttributes requestAttributes; public WrappedCallable ( Callable<T> target, RequestAttributes requestAttributes) { this .target = target; this .requestAttributes = requestAttributes; } @Override public T call () throws Exception { try { RequestContextHolder.setRequestAttributes(this .requestAttributes); return this .target.call(); } finally { RequestContextHolder.resetRequestAttributes(); } } } }
这时候大家就奇怪了,为什么在wrapCallable方法中可以获取到当前请求呢,来看源码是怎么调用HystrixConcurrencyStrategy的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class HystrixContextRunnable implements Runnable { private final Callable<Void> actual; private final HystrixRequestContext parentThreadState; public HystrixContextRunnable (Runnable actual) { this (HystrixPlugins.getInstance().getConcurrencyStrategy(), actual); } public HystrixContextRunnable (HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) { this .actual = concurrencyStrategy.wrapCallable(new Callable<Void>() { @Override public Void call () throws Exception { actual.run(); return null ; } }); this .parentThreadState = HystrixRequestContext.getContextForCurrentThread(); } @Override public void run () { HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread(); try { HystrixRequestContext.setContextOnCurrentThread(parentThreadState); try { actual.call(); } catch (Exception e) { throw new RuntimeException(e); } } finally { HystrixRequestContext.setContextOnCurrentThread(existingState); } } }