0%

Hystrix线程隔离导致的问题

Hystrix线程隔离

在微服务框架中,可能一个服务需要调用多个微服务,在tomcat中运行时,tomcat只是分配了100个线程,由于多个服务之间调用的时间消耗过长,可能会导致线程耗尽,而在Hystrix中存在线程隔离,对于每个微服务分配一个线程池,访问某个微服务时就从对应的线程池中取线程,如果对应线程池中的线程都用光了,那么就认为该服务不可用了,如果在需要请求该微服务,则直接返回

那么这个线程池是存在于哪里呢?

那么既然Hystrix的Command都是在线程池中执行的,那么就会遇到当前的RequestContextHolder获取不到RequestAttributes,没办法,跨线程了呀(RequestContextHolder中使用了ThreadLocal),这怎么解决呢?Hystrix中提供了一个HystrixConcurrencyStrategy类,HystrixConcurrencyStrategy提供了一套默认的并发策略实现。我们可以根据我们自己不同需求通过装饰去扩展它。如每次执行HystrixCommand的时候都会去调用wrapCallable(Callable) 方法,这里我们就可以通过装饰Callable使它提供一些额外的功能(如ThreadLocal上下文传递)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class FeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy
{
private static final Logger log;
private HystrixConcurrencyStrategy delegate;

public FeignHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof FeignHystrixConcurrencyStrategy) {
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
HystrixPlugins.reset();
// 注册并发策略
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}

// 这个时候还在主线程了,所以通过RequestContextHolder.getRequestAttributes()是能拿到上下文的拿到后hold住,等到run执行的时候再绑定即可
public <T> Callable<T> wrapCallable( Callable<T> callable) {
// 获取当前请求的requestAttributes
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<T>(callable, requestAttributes);
}

public ThreadPoolExecutor getThreadPool( HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public ThreadPoolExecutor getThreadPool( HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}

public BlockingQueue<Runnable> getBlockingQueue( int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}

public <T> HystrixRequestVariable<T> getRequestVariable( HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}

class WrappedCallable<T> implements Callable<T>
{
private final Callable<T> target;
private final RequestAttributes requestAttributes;

public WrappedCallable( Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}

@Override
public T call() throws Exception {
try {
// 执行之前绑定上下文,执行完成后释放
RequestContextHolder.setRequestAttributes(this.requestAttributes);
return this.target.call();
}
finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}

这时候大家就奇怪了,为什么在wrapCallable方法中可以获取到当前请求呢,来看源码是怎么调用HystrixConcurrencyStrategy的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class HystrixContextRunnable implements Runnable {

private final Callable<Void> actual;
// 父线程的上下文
private final HystrixRequestContext parentThreadState;

public HystrixContextRunnable(Runnable actual) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
}

public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
// 实例化HystrixContextRunnable的时候去调用的concurrencyStrategy.wrapCallable,此时还没有切换线程呢
this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {

@Override
public Void call() throws Exception {
actual.run();
return null;
}

});
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
}

@Override
public void run() {
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Callable with the state of the parent
try {
actual.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
} finally {
// restore this thread back to its original state
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}

}

欢迎关注我的其它发布渠道