0%

netty示例

netty示例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Server {
public static void main(String[] args) throws InterruptedException {
// NioEventLoopGroup是用来处理IO操作的多线程事件循环器
// 这里使用了两个
// 一个bossGroup用于接收Client端连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 一个workerGroup用于处理被接收的数据,一旦bossGroup接收到连接,就会把连接信息注册到worker上
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
// 辅助启动类 对server进行一系列的配置
ServerBootstrap bootstrap = new ServerBootstrap();
// 将两个工作线程组加入进来
bootstrap.group(bossGroup,workerGroup)
// 指定NioServerSocketChannel类型的通道
.channel(NioServerSocketChannel.class)
// ChannelInitializer是一个特殊的处理类,目的是帮助使用者配置一个新的Channel
// 使用childHandler绑定具体的事件处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 管道中加入处理器
// pipeline本质是一个处理网络事件的职责链,负责管理和执行ChannelHandler
// pipeline是一个双向链表 包含 AbstractChannelHandlerContext head; 和AbstractChannelHandlerContext tail;
socketChannel.pipeline().addLast(new ServerHandler());
}
})
// tcp缓冲区,线程队列等待连接的个数
// option是提供给NioServerSocketChannel用于接收进来的连接
.option(ChannelOption.SO_BACKLOG,128)
// 保持活动连接状态
// childOption提供给由父管道ServerChannel接收到的连接
.childOption(ChannelOption.SO_KEEPALIVE,true);

System.out.println("服务器启动");
// 绑定端口并同步 ChannelFuture继承了Future,异步结果
ChannelFuture future = bootstrap.bind(8765).sync();
// 阻塞 防止关闭 只有关闭事件触发才会执行
future.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


}
}

处理器

继承ChannelInboundHandlerAdapter,其实现了ChannelInboundHandler接口,
继承ChannelInboundHandlerAdapter就不需要实现接口方法了,根据需要去重写ChannelInboundHandlerAdapter类中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 继承ChannelInboundHandlerAdapter
public class ServerHandler extends ChannelInboundHandlerAdapter {

// 读取客户端发送的数据,每个信息入站都会调用
// ChannelHandlerContext 上下文对象,含有管道pipeline,通道channel,地址
// Object 消息数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String request = new String(data, StandardCharsets.UTF_8);
System.out.println("接收到的消息为:->"+request);

// 回复客户端
// ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()))
// // 数据发送完之后主动断开连接
// .addListener(ChannelFutureListener.CLOSE);
} finally {
// 释放缓冲
ReferenceCountUtil.release(msg);
}
}

// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭
ctx.close();
}

// 数据读取完毕,当channelRead读取到最后一条消息时调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 仅仅调用write不会将消息写入到SocketChannel上,而是把待发送的消息放到发送缓冲数组中,需要调用flush来将缓冲区中数据写到SocketChannel中
// 回复客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes()));

}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Client {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) // 线程组
.channel(NioSocketChannel.class) // 客户端NioSocketChannel通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 客户端处理器
socketChannel.pipeline().addLast(new ClientHandler());
}
});
System.out.println("客户端启动");
ChannelFuture future = bootstrap.connect("127.0.0.1",8765).sync();

// 阻塞 防止关闭 只有关闭事件触发才会执行
future.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
group.shutdownGracefully();
}


}
}

处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ClientHandler extends ChannelInboundHandlerAdapter {

// 通道准备就绪,在连接被建立且准备通信时被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 写数据
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀".getBytes()));
}

// 读取服务器的消息数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String request = new String(data, StandardCharsets.UTF_8);
System.out.println("接收到的回复为:->"+request);

} finally {
// 释放缓冲
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

欢迎关注我的其它发布渠道