0%

EventLoop组件分析

EventLoop组件分析

EventLoopGroup是一组EventLoop的抽象,含有多个EventLoop,可以注册channel,用于在事件循环中进行选择,netty为了更好地利用多核cpu资源,一般会有多个EventLoop同时工作,每个EventLoop维护一个Selector实例,EventLoopGroup提供了next方法,来从组内按照一定的规则获取EventLoop来处理任务

NioEventLoop
1
2
3
4
5
// bossGroup用于接收Client端连接,会将请求交给workerGroup
// NioEventLoopGroup中的子线程数默认是cpu核数*2,是一个NioEventLoop线程组,NioEventLoop实现了Executor接口和EventLoop接口,本质是一个Executor
EventLoopGroup bossGroup = new NioEventLoopGroup();
// workerGroup会获取到真正的连接,然后和连接进行通信,workerGroup用于实际业务处理的
EventLoopGroup workerGroup = new NioEventLoopGroup();

在实例化NioEventLoopGroup时会根据线程数去创建EventExecutor数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// nThreads  使用的线程数,默认核心线程数*2
// executor 执行器,如果为null,则使用netty默认的ThreadPerTaskExecutor和默认的线程工厂newDefaultThreadFactory
// chooserFactory 单例DefaultEventExecutorChooserFactory.INSTANCE
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

// 如果为null,则使用netty默认的ThreadPerTaskExecutor和默认的线程工厂
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 根据线程数创建EventExecutor数组,EventExecutor真实类型为NioEventLoop
children = new EventExecutor[nThreads];

// 初始化线程组
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建NioEventLoop
// newChild 代码逻辑
// EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// return new NioEventLoop(this, executor, (SelectorProvider) args[0],
//((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果创建失败,则关闭该NioEventLoop
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 添加关闭监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
// 将所有的线程放入到childrenSet中
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

欢迎关注我的其它发布渠道