0%

NIO基本操作

NIO

NIO早在JDK1.4中就已经提出来了(JSR51),在JDK1.7中对NIO进行了补充类库NIO.2(JSR 203)

同步非阻塞

阻塞与非阻塞的区别:

  • 阻塞时,在调用结果返回时,当前线程会被挂起,并在得到结果之后返回

    传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write() 时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降

  • 非阻塞时,如不能立即得到结果,该调用不会阻塞当前线程,调用者需要定时轮询查看处理状态

    Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端

Channel(通道)和Buffer(缓冲区)

与普通IO的不同和关系

  • NIO以块(缓冲区)的方式处理数据,但是IO是以最基础的字节流的形式去写入和读出的

  • NIO不再是和IO一样用OutputStream和InputStream输入流的形式来进行处理数据的,但是又是基于这种流的方式,采用了通道和缓冲区的形式进行处理

  • NIO的通道是可以双向的,IO的流只能是单向的

  • NIO的缓冲区(字节数组)还可以进行分片,可以建立只读缓冲区、直接缓冲区和间接缓冲区,只读缓冲区就是只可以读,直接缓冲区是为了加快I/O速度,以一种特殊的方式分配其内存的缓冲区

  • NIO采用的是多路复用的IO模型,BIO用的是阻塞的IO模型

通道Channel负责传输,缓冲区Buffer负责存储

核心组件

缓冲区Buffer

  • Buffer是一个对象,它包含一些要写入或者刚读出的数据。在NIO中加入Buffer对象,在流式IO中,将数据直接写入或者读到Stream对象中

  • 在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,它是写入到缓冲区的。任何时候访问NIO中的数据,都需要将它放到缓冲区中

  • 缓冲区实质上是一个数组。通常它是一个字节数组,但是也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程

1
2
3
4
5
6
7
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
重要变量及方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//标记位置,记录标记时的position位置,可以使用reset()恢复到mark标记的位置 mark<=position
private int mark = -1;
//当前进行读写操作的数据元素的位置position<=limit
private int position = 0;
//缓冲区数组中进行读写操作的最大允许位置,limit<=capacity,limit之后的数据不能进行读写
private int limit;
//缓冲区数组的总长度,创建时指定的,不可以改变
private int capacity;

public final int capacity() {
return capacity;
}

public final int position() {
return position;
}

public final Buffer position(int newPosition) {
if ((newPosition > limit) || (newPosition < 0))
throw new IllegalArgumentException();
position = newPosition;
if (mark > position) mark = -1;
return this;
}

public final int limit() {
return limit;
}

public final Buffer limit(int newLimit) {
if ((newLimit > capacity) || (newLimit < 0))
throw new IllegalArgumentException();
limit = newLimit;
if (position > newLimit) position = newLimit;
if (mark > newLimit) mark = -1;
return this;
}

// 将当前位置进行标记
public final Buffer mark() {
mark = position;
return this;
}
// 使用reset方法可以将读写位置回到mark的位置上
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}

// 清空缓冲区,但是缓存区的数据还在
//将读写位置置为0
// 读写限制为容量
// 标记恢复-1
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

// 读写模式切换
// 将读写限制设为当前位置
// 读写位置设为0
// 标记恢复-1
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

// 可重复读
// 读写位置设为0
// 标记恢复-1
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// 分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 0
System.out.println(buf.position());
// 1024
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

buf.put("hello".getBytes());
System.out.println("写操作后");
// 5
System.out.println(buf.position());
// 1024
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

buf.flip();
System.out.println("切换为读模式");
// 0
System.out.println(buf.position());
// 5
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

byte[] read = new byte[buf.limit()];
buf.get(read);
System.out.println(new String(read,0,read.length));

System.out.println("数据读取之后");
// 5
System.out.println(buf.position());
// 5
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());


buf.rewind();
System.out.println("重新读取");
// 0
System.out.println(buf.position());
// 5
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

byte[] two = new byte[2];
buf.get(two);
System.out.println(new String(two,0,two.length));
System.out.println("读取2个字节");
// 2
System.out.println(buf.position());
// 5
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());
// 标记当前位置
buf.mark();
buf.get(two);
System.out.println(new String(two,0,two.length));
System.out.println("再次读取2个字节");
// 4
System.out.println(buf.position());
// 5
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

buf.reset();
System.out.println("重置");
// 2
System.out.println(buf.position());
// 5
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

buf.clear();
System.out.println("清空缓冲区");
// 0
System.out.println(buf.position());
// 1024
System.out.println(buf.limit());
// 1024
System.out.println(buf.capacity());

byte[] clearData = new byte[5];
buf.get(clearData);
// 清空缓冲区之后数据还在,只是position和limit恢复到了原本的状态
// hello
System.out.println(new String(clearData,0,clearData.length));

通道Channel

通道是对原I/O包中的流的模拟,表示打开到IO设备的连接,本身不存储数据。到任何目的地的所有数据都必须通过一个Channel对象(通道)。一个Buffer实质上就是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;从通道中读取的任何数据都要读到缓冲区中,Channel只能与Buffer进行交互

有两个比较重要的通道,文件通道和套接字通道

文件通道
FileChannel的创建

主要有两个方式

第一种方式

使用FileChannel.open()方法创建

1
2
public static FileChannel open(Path path, OpenOption... options)
throws IOException
1
2
3
// 第一个参数为文件路径
// 余下的参数为打开文件的选项 是一个可变参数,可以传多个模式
FileChannel.open(Paths.get(FILE), StandardOpenOption.READ,StandardOpenOption.CREATE,StandardOpenOption.WRITE);

第二种方式

使用文件流来创建

1
2
3
4
5
FileChannel fileChannel = new FileOutputStream(FILE).getChannel()

FileChannel fileChannel = new RandomAccessFile(FILE,"rw").getChannel()

FileChannel fileChannel = new FileInputStream(FILE).getChannel()
读写示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
//生成FileChannel文件通道 FileChannel的操作--> 操作ByteBuffer用于读写,并独占式访问和锁定文件区域


// 写入文件
try(FileChannel fileChannel = new FileOutputStream(FILE).getChannel()){
fileChannel.write(ByteBuffer.wrap("test".getBytes()));
} catch (IOException e){
throw new RuntimeException("写入文件失败",e);
}
// 在文件结尾写入
try(FileChannel fileChannel = new RandomAccessFile(FILE,"rw").getChannel()){
fileChannel.position(fileChannel.size());//移至文件结尾
fileChannel.write(ByteBuffer.wrap("some".getBytes()));
} catch (IOException e){
throw new RuntimeException("写入文件结尾失败",e);
}

try(FileChannel fileChannel = new FileInputStream(FILE).getChannel();
FileChannel out = new FileOutputStream("C:\\Users\\Desktop\\copy.txt").getChannel()
){
// 读取操作,需要调用allocate显示分配ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// read之后将数据放入缓冲区
while (fileChannel.read(byteBuffer) != -1){
byteBuffer.flip(); // 准备写入
out.write(byteBuffer);
byteBuffer.clear(); // 清空缓存区
}
} catch (IOException e){
throw new RuntimeException("读取文件失败",e);
}
}
文件传输

可以直接使用transferFrom、transferTo传输方法来快速的传输数据

1
2
3
4
5
6
7
8
// 从src源通道中的数据写入当前文件通道
public abstract long transferFrom(ReadableByteChannel src,
long position, long count)
throws IOException;
// 从当前文件通道中的数据写入target通道
public abstract long transferTo(long position, long count,
WritableByteChannel target)
throws IOException;
套接字通道

在之前使用套接字时通常使用Socket和ServerSocket来进行网络编程,ServerSocket使用accept进行端口监听,但是使用accept时会处于阻塞状态,一直等待客户端程序的连接请求,严重影响到系统的性能和吞吐量

在NIO中提供了NetworkChannel接口来进行套接字通道

客户端、服务端分别使用SocketChannel和ServerSocketChannel来创建通道,其提高性能和吞吐量的核心在于多路复用器,也就是Selector

选择器Selector

Selector是多路复用器,用于同时监测多个SelectableChannel 通道的事件以实现异步I/O。

通过一个选择器来同时对多个套接字通道进行监听,当套接字通道有可用的事件的时候,通道改为可用状态,选择器就可以实现可用的状态。

1
2
3
4
5
6
7
8
// 读操作
public static final int OP_READ = 1 << 0;
// 写操作
public static final int OP_WRITE = 1 << 2;
// 连接已经建立
public static final int OP_CONNECT = 1 << 3;
// 接收
public static final int OP_ACCEPT = 1 << 4;

工作原理

客户端—–》Channel—–》Selector——》keys–状态改变—》server

Buffer 缓冲区
Channel 通道
Selector 选择器

Server端创建ServerSocketChannel 有一个Selector多路复用器 轮询所有注册的通道,根据通道状态,执行相关操作

  • Connect 连接状态
  • Accept 阻塞状态
  • Read 可读状态
  • Write 可写状态

Client端创建SocketChannel 注册到Server端的Selector

NIO网络编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定8080端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 注册监听的事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.readyOps() == SelectionKey.OP_ACCEPT){
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
// 接收到服务端的请求
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// SocketChannel注册到selector,关注OP_READ事件
sc.register(selector,SelectionKey.OP_READ);
// 移除当前的SelectionKey,防止重复操作
iterator.remove();
} else if(selectionKey.readyOps() == SelectionKey.OP_READ){
SocketChannel sc = (SocketChannel) selectionKey.channel();
byteBuffer.clear();
int n = sc.read(byteBuffer);
if(n > 0){
byteBuffer.flip();
Charset charset = StandardCharsets.UTF_8;
String message = String.valueOf(charset.decode(byteBuffer).array());
System.out.println(message);
}
sc.register(selector,SelectionKey.OP_WRITE);
iterator.remove();
} else if(selectionKey.readyOps() == SelectionKey.OP_WRITE){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("已接收到消息".getBytes());
buffer.flip();
sc.write(buffer);
iterator.remove();
}
}
}

内存映射文件

内存映射文件可以创建和修改那些因为太大而无法放入内存的文件。

1
2
3
4
5
6
RandomAccessFile tdat = new RandomAccessFile("test.dat", "rw");
MappedByteBuffer out = tdat.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, length);

或者
FileChannel fc = new FileInputStream(new File("temp.tmp")).getChannel();
IntBuffer ib = fc.map(FileChannel.MapMode.READ_ONLY,0, fc.size()).asIntBuffer();

映射文件访问比标准IO性能高很多

文件锁定

文件锁定可同步访问,文件锁对其他操作系统进程可见,因为java文件锁直接映射到本机操作系统锁定工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class FileLockTest {
private static final String FILE = "C:\\Users\\sinosoft\\Desktop\\剩余工作副本.txt";
public static void main(String[] args) throws IOException, InterruptedException {
FileChannel fileChannel = new FileOutputStream(FILE).getChannel();

// 文件锁
FileLock fileLock = fileChannel.tryLock();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
FileChannel fileChannel = null;
try {
fileChannel = new FileOutputStream(FILE).getChannel();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("aqws".getBytes());
try {
System.out.println("线程准备写");
fileChannel.write(byteBuffer);
System.out.println("线程写完");
} catch (IOException e) {
e.printStackTrace();
}
}
});
thread.start();
if(fileLock != null){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("aqwqdqdhwfwihfejfhi".getBytes());
System.out.println("主线程睡眠");
Thread.sleep(10000);
// 会报错 java.nio.channels.NonWritableChannelException
// fileChannel.read(byteBuffer);
System.out.println("主线程准备写");
fileChannel.write(byteBuffer);
fileLock.release();
}
}
}



主线程睡眠
线程准备写
java.io.IOException: 另一个程序已锁定文件的一部分,进程无法访问。
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:75)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at com.zhanghe.study.io.nio.FileLockTest$1.run(FileLockTest.java:35)
at java.lang.Thread.run(Thread.java:745)
主线程准备写


通过调用FileChannel上的tryLock或lock,可以获得整个文件的FileLock(SocketChannel、DatagramChannel和ServerSocketChannel不需要锁定,因为本质上就是单线程实体)

tryLock()是非阻塞的,试图获取锁,若不能获取,只是从方法调用返回

lock()会阻塞,直到获得锁,或者调用lock()的线程中断,或者调用lock()方法的通道关闭。

使用FileLock.release()释放锁

1
2
// 锁定文件的一部分,锁住size-position区域。第三个参数指定是否共享此锁
tryLock(long position, long size, boolean shared)