0%

netty示例

netty示例

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Server {
public static void main(String[] args) throws InterruptedException {
// bossGroup用于接收Client端连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// workerGroup用于实际业务处理的
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
// 辅助类 对server进行一系列的配置
ServerBootstrap bootstrap = new ServerBootstrap();
// 将两个工作线程组加入进来
bootstrap.group(bossGroup,workerGroup)
// 指定NioServerSocketChannel类型的通道
.channel(NioServerSocketChannel.class)
// 使用childHandler绑定具体的事件处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 管道中加入处理器
// pipeline是一个双向链表 包含 AbstractChannelHandlerContext head; 和AbstractChannelHandlerContext tail;
socketChannel.pipeline().addLast(new ServerHandler());
}
})
// tcp缓冲区,线程队列等待连接的个数
.option(ChannelOption.SO_BACKLOG,128)
// 保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE,true);

System.out.println("服务器启动");
// 绑定端口并同步 ChannelFuture继承了Future,异步结果
ChannelFuture future = bootstrap.bind(8765).sync();
// 阻塞 防止关闭 只有关闭事件触发才会执行
future.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}


}
}

处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 继承ChannelInboundHandlerAdapter
public class ServerHandler extends ChannelInboundHandlerAdapter {

// 读取客户端发送的数据
// ChannelHandlerContext 上下文对象,含有管道pipeline,通道channel,地址
// Object 消息数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String request = new String(data, StandardCharsets.UTF_8);
System.out.println("接收到的消息为:->"+request);

// 回复客户端
// ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()))
// // 数据发送完之后主动断开连接
// .addListener(ChannelFutureListener.CLOSE);
} finally {
// 释放缓冲
ReferenceCountUtil.release(msg);
}
}

// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭
ctx.close();
}

// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 回复客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes()));

}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Client {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) // 线程组
.channel(NioSocketChannel.class) // 客户端NioSocketChannel通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 客户端处理器
socketChannel.pipeline().addLast(new ClientHandler());
}
});
System.out.println("客户端启动");
ChannelFuture future = bootstrap.connect("127.0.0.1",8765).sync();

// 阻塞 防止关闭 只有关闭事件触发才会执行
future.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
group.shutdownGracefully();
}


}
}

处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ClientHandler extends ChannelInboundHandlerAdapter {

// 通道准备就绪
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 写数据
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀".getBytes()));
}

// 读取服务器的消息数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String request = new String(data, StandardCharsets.UTF_8);
System.out.println("接收到的回复为:->"+request);

} finally {
// 释放缓冲
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}