0%

ChannelHandler组件分析

ChannelHandler主要是为了处理入站和出站数据的逻辑,netty中提供了多个接口

ChannelInboundHandler入站接口和ChannelOutboundHandler出站接口

入站和出站

  • ChannelInboundHandler入站表示数据是从远程主机到用户应用程序,处理进站数据和所有状态更改事件

  • ChannelOutboundHandler出站是指从用户应用程序到远程主机,处理出站数据,允许拦截各种操作

为了使数据从一端到达另一端,一个或多个ChannelHandler将以某种方式操作数据,这些Channel会添加到ChannelPipeline中,并按照被添加的顺序进行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
*                                                 I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+

ChannelInboundHandler接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public interface ChannelInboundHandler extends ChannelHandler {

/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
// 连接被建立并准备进行通信时被调用
void channelActive(ChannelHandlerContext ctx) throws Exception;

/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;

/**
* Invoked when the current {@link Channel} has read a message from the peer.
*/
// 读取当前channel的消息
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

/**
* Gets called if a {@link Throwable} was thrown.
*/
// 当出现异常时调用该方法
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

原子操作

在java.util.concurrent.atomic包下提供了很多原子操作类,多个线程执行一个操作时,其中任何一个线程要么完全执行此操作,要么没有执行此操作的任何步骤,其内部使用的CAS操作 乐观锁

以AtomicInteger为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

// 获取当前值
public final int get() {
return value;
}

// 设置给定值
public final void set(int newValue) {
value = newValue;
}

// 延时设置
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}

// 以原子方式设置给定值,并返回旧值 线程安全版本的 tmp = oldValue; oldValue = newValue; return oldValue
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

// 如果当前值等于预期值,则以原子方式设置为给定的更新值,成功则返回true,失败返回false
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// 如果当前值等于预期值,则以原子方式设置为给定的更新值,成功则返回true,失败返回false
// 与compareAndSet方法一样
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// 以原子方式加一 ,相当于线程安全版的i++
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

// 以原子方式减一 ,相当于线程安全版的i--
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}

// 以原子方式将给定值与当前值相加 线程安全版的i=+10 先获取再相加
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}

// 以原子方式加一 ,相当于线程安全版的++i
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// 以原子方式减一 ,相当于线程安全版的--i
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}

// 以原子方式将给定值与当前值相加 线程安全版的i+=10 先相加在获取
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

}

指令重排序

JVM会根据处理器的特性适当的重新排序机器指令,使机器指令更符合CPU的执行特点,最大限度的发挥机器的性能,但是会导致执行顺序可能会与代码顺序不一致

异步处理

在Servlet3.0版本中引入了异步处理的功能,使线程可以返回到容器,从而执行更多的任务

使用AysncContext来进行异步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface ServletRequest {

// 使用原始请求和响应对象用于异步处理
public AsyncContext startAsync() throws IllegalStateException;

// 将请求转换为异步模式,并使用给定的请求和响应对象初始化,可以使用ServletRequestWrapper和ServletResponseWrapper
public AsyncContext startAsync(ServletRequest servletRequest,
ServletResponse servletResponse)
throws IllegalStateException;

// 检测请求是否为异步模式,使用原始请求和响应对象进行处理
public boolean isAsyncStarted();

// 是否支持异步处理
public boolean isAsyncSupported();

// 返回由startAsync调用创建的AsyncContext
public AsyncContext getAsyncContext();

}



public interface AsyncContext {
// AsyncContext中的请求响应进行分派
void dispatch();

// 分派到指定资源
void dispatch(String path);

// 分派到指定资源
void dispatch(ServletContext context, String path);
// 完成异步操作,并结束与这个异步上下文的关联的响应,在异步上下文中写入响应对象之后调用该方法
void complete();
// 容器提供了一个不同的线程,在该线程中处理阻塞操作
void start(Runnable run);
// 注册监听器用于接收 onTimeout、onError(用于通知监听器在Servlet上启动的异步操作未能完成)、onComplete(用于通知监听器在Servlet上启动的异步操作完成了)、onStartAsync(用于通知监听器正在通过调用一个ServletRequest.startAsync方法启动一个新的异步周期)通知
void addListener(AsyncListener listener);

void addListener(AsyncListener listener, ServletRequest request,
ServletResponse response);
}

要在servlet上启用异步处理,需要配置asyncSupported为true

阅读全文 »

ConcurrentMap

ConcurrentMap接口是线程安全的Map接口,ConcurrentHashMap是HashMap的线程安全版本,ConcurrentSkipListMap是TreeMap的线程安全版本

ConcurrentHashMap

在JDK7的时候是将ConcurrentHashMap采用分段锁,将一个整体分为16段HashTable,每个段是一个Segment,提高了并发度;JDK8取消了Segment分段结构,改成了与HashMap一样的数组+链表+红黑树,实现对每一段数据进行加锁,也减少了并发冲突的概率

JDK7

Segment

在jdk8之前concurrentHashMap使用该对象进行分段加锁,降低了锁的粒度,使得并发效率提高,Segment本身也相当于一个HashMap,Segment包含一个HashEntry数组,数组中每个HashEntry既是一个键值对,又是一个链表的头结点

阅读全文 »

EventLoop组件分析

EventLoopGroup是一组EventLoop的抽象,含有多个EventLoop,可以注册channel,用于在事件循环中进行选择,netty为了更好地利用多核cpu资源,一般会有多个EventLoop同时工作,每个EventLoop维护一个Selector实例,EventLoopGroup提供了next方法,来从组内按照一定的规则获取EventLoop来处理任务

NioEventLoop
1
2
3
4
5
// bossGroup用于接收Client端连接,会将请求交给workerGroup
// NioEventLoopGroup中的子线程数默认是cpu核数*2,是一个NioEventLoop线程组,NioEventLoop实现了Executor接口和EventLoop接口,本质是一个Executor
EventLoopGroup bossGroup = new NioEventLoopGroup();
// workerGroup会获取到真正的连接,然后和连接进行通信,workerGroup用于实际业务处理的
EventLoopGroup workerGroup = new NioEventLoopGroup();
阅读全文 »