Java NIO 从入门到精通

1.概览

JAVA NIO 是 JAVA IO 和 JAVA 网络编程 的一个升级版。与传统IO相比,NIO提供了一个不同的IO编程模型。注意,有时候 NIO 又被称作Non-blocking IO。然而这并不是 NIO 原始的含义(JAVA NIO部分api实际上是阻塞的,比如file API)。所以说,Non-blocking是有一点误解在里面。

在JAVA NIO中,最重要的3个组件是Channels、Buffers、Selectors。一个selector可以绑定多个channel(注册感兴趣事件),每个channel可以使用buffer进行读写。

JAVA NIO模型

2. Channel

JAVA NIO Channel和传统的io流相似,不过有一些不同之处:

  • 对于一个Channel,你既可以读,也可以写。而传统的io流都是单向的(要么只读,要么只写)。
  • Channel的读写都是异步的。
  • 对Channel读写总是要经过Buffer中转。

就和上说的一样,你可以从Channel中读数据到Buffer,也可以把数据从buffer写到Channel。

channel的读写

下面有4个重要的Channel实现类

  • FileChannel:读写本地文件。
  • DatagramChannel:通过UDP协议发送或接受数据。
  • SocketChannel:通过TCP协议发送或接受数据。
  • ServerSocketChannel:允许你监听一个端口,等待TCP连接的接入。一旦TCP连接接入成功,就会创建一个SocketChannel。

3. Buffer

JAVA NIO 的Buffer是用于和Channel交互的媒介。对Channel进行数据的读写都必须经过Buffer。

Buffer本质上是内存的一块空间,你可以往里读写内容。Buffer只是对这片内存空间的一个封装,提供了一系列API让你更加方便的操作里面的数据。

3.1 Buffer的基本用法

使用Buffer进行读写基本分为这4步。

  1. 写数据到Buffer。
  2. 调用buffer.flip(),将写模式切换为读模式。
  3. 读取Buffer中的数据。
  4. 调用buffer.clear()或者buffer.compact()方法。

当你往Buffer中写数据时,Buffer会知道你写了多少数据。一旦你需要读这些数据时,你需要使用flip()将Buffer从写模式切换到读模式。在读模式下,你可以把刚刚写入的数据全部读取出来。

一旦你把所有的数据都读完了,你需要再次将Buffer置为写模式来确保Buffer再次可写。有两个方法可以做到这一点:clear()或compact()。clear()将清空整个Buffer。而compact()仅仅清空你已经读过的部分(意味着你只读了一部分数据的情况)。所有未读部分的数据将会移动到Buffer的头部,新写入的数据会紧挨着未读部分的末尾进行追加。这里一个简单的例子演示Buffer的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//创建一个文件
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
//获得这个文件的channel
FileChannel inChannel = aFile.getChannel();

//创建一个48字节大小的ByteBuffer
ByteBuffer buf = ByteBuffer.allocate(48);

//将数据从channel 读进 buffer
int bytesRead = inChannel.read(buf);

while (bytesRead != -1) {
//确保buffer可读
buf.flip();
//如果buffer还有内容,就一直读,直到buffer中的内容全部读取完毕
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // 每次读取一个字节
}
//将buffer清空,置为可写模式
buf.clear();
//再重新将数据从channel 读进 buffer
bytesRead = inChannel.read(buf);
}
//关闭文件
aFile.close();

3.2 Buffer的3个重要属性

Buffer有3个你必须知道的重要属性:

  • capacity:容量
  • position:当前指针位置
  • limit :当前的限制位置

上面的解释比较笼统,具体来讲position和limit的具体含义取决于他当前是处于读模式还是写模式。至于capacity,就是Buffer的总容量的意思,和当前是什么模式无关。

Capacity

作为一个内存块,Buffer必须要有一个固定的大小,也叫capatity。一旦Buffer满了,还想写新数据,你必须得先清空其中的数据。

Position

写模式:你如果想往Buffer中写数据,那么你肯定得基于一个Buffer中一个特定的位置(position)开始写。初始情况下,这个位置是0。当你往Buffer中写完数据后,position就会往前移动到当前数据位置长度+1。因此position的最大值是capacity - 1。

读模式:当你想从Buffer中读取数据,同样的你也要基于一个特定的位置(position)开始读。当Buffer从写模式切换到读模式后,position的位置会置为0。随着你读取Buffer中的数据,position也会往前移动到你读取的位置。

Limit

在写模式下,limit表示你最多可以往buffer中写入多少数据。因此他总是等于Buffer的capacity。

当切换到读模式下,limit表示你最多可以从Buffer中读取多少数据。因此当Buffer切换到读模式时,limit会设置为写模式的position。换句话说,之前写多少数据,你现在就可以读多少数据。

3.3 Buffer的类型

JAVA NIO带来了很多不同的Buffer类型:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

正如你所看到的,不同的Bufffer代表了不同的数据类型。

3.4 创建Buffer

为了创建一个Buffer对象你必须得先分配空间。每一个Buffer类都会有一个allocate()的静态方法。下面是一个创建ByteBuffer分配48字节的例子:

1
ByteBuffer buf = ByteBuffer.allocate(48);

下面是一个分配1024字符的CharBuffer的例子

1
CharBuffer buf = CharBuffer.allocate(1024);

3.5 往Buffer中写数据

有两个方式可以往Buffer中写数据:

  1. Channel往Buffer中写数据。

    1
    int bytesRead = inChannel.read(buf); //read into buffer.
  2. 调用put()方法,你自己往Buffer中写数据。

    1
    buf.put(127); 

还有很多其他版本的put()方法允许你往Buffer中以各种方式写入数据。比如在特定的位置写数据,或者通过byte数组写数据。具体可以查看java doc搜索很多用法。

3.6 flip()

flip()方法将Buffer从写模式切换到读模式。调用flip()方法将会把limit设置为写模式下的postion,再把position设置为0。

1
2
3
//flip()从写模式切换到读模式相当于下面两行代码。
limit = position
position = 0

3.7 从Buffer中读取数据

有两个方式可以从Buffer中读数据:

  • 从Buffer读进Channel。

    1
    2
    //read from buffer into channel.
    int bytesWritten = inChannel.write(buf);
  • 自己调用get()方法从Buffer中读取。

    1
    byte aByte = buf.get();    

get()方法也有很多个不同的版本,允许你使用各种方式从buffer中读取数据。比如读取特定位置的数据,或者将数据从buffer读进一个byte数组。

3.8 rewind()

rewind()方法会将postion设置为0,因此你可以重读buffer中所有的数据。limit会保持不变,还是标记最多可以读多少数据。

3.9 clear() 和 compact()

一旦你读完了buffer中的所有数据,你得让buffer变成再次可写。你可以通过调用clear()或者compact()来达到目的。

如果你调用clear(),position将会被设置为0,limit设置为capacity。换句话说,指针重置了,但是buffer中的数据并未清除。仅仅只是指针表示你可以从开头覆写数据了。

如果你有未读完的数据在Buffer中,你却调用了clear(),那么这些未读数据将会被抛弃。也就是说,指针已经没有办法去分辨哪些数据已读,哪些数据未读了。

如果你有未读完的数据在Buffer中,你现在又不想读,想先写数据,等晚点再读。那么你可以调用compact()方法而不是clear()。

compact()会将所有的未读数据移动到buffer的头部,然后将position设置为最后一个未读数据的位置。limit依然是capacity。现在bufffer就是可写状态了,但是并不会覆写或丢弃未读消息。

3.10 Scatter(分散读) 和 Gather(聚集写)

分散读是指从一个channel读进一个或多个buffer中。聚集写是指一个或多个buffer写入到一个channel中。

分散读和聚集写在需要将传输的数据进行分块的时候有用。比如消息由head和body两部分组成。你可以将keep和body分别存到不同的buffer。

4. Selector

JAVA NIO 的 selector 可以检查多个 channel 确定是否处于可读或者可写状态。这样依赖单线程也可以管理多个channel,从而管理多个网络连接。

4.1 为什么要使用selector?

使用selector的一大好处就是你可以使用很少的线程管理多个channel。实际上,使用单线程你就可以管理所有的channel。线程间的切换对操作系统来说需要花费昂贵的代价,并且每个线程都需要消耗系统的资源。因此用的线程越少越好。

不过要记住,现代操作系统和cpu对多任务的支持越来越好了。随着时间的推移,多线程的开销变得越来越小。实际上如果cpu是多核,而你没有多任务同时运行,那么你可能在浪费cpu的性能。总之,针对这块设计的讨论是另外的事情了。这里只想说,通过使用selector你可以仅靠一个线程操纵多个channel。

4.2 创建selector

你可以通过selector.open()来创建一个selector。

1
Selector selector = Selector.open();

4.3 把channel注册到selector

为了channel和selector配套使用,你必须把channel注册到selector上。这个注册动作由SelectableChannel.register()方法完成。就像这样:

1
2
3
4
//将channel设为非阻塞
channel.configureBlocking(false);
//将channel注册到selector上。
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

channel必须是非阻塞模式才能注册到selector。这就意味着你不能把FileChannel注册到selector中,因为FileChannel是不可以被设置为非阻塞的。不过对于Socket channel来说是没问题的。

注意register函数的第二个参数。这是一个兴趣集,通过selector,你可以监听channel上感兴趣的事件。一共有4种事件你可以去监听:

  1. Connect
  2. Accept
  3. Read
  4. Write

一个channel触发某个一个事件,也可以被称作他对那个事件“就绪(ready)”。因此当一个channel成功连接到另外一台服务器,就是“连接就绪(connect ready)”。一个server socket的channel接收了一个连接就是“accept ready”。一个channel有数据可供读取,就是“read ready”。一个channel可以写数据了,就是“write ready”。

这四个事件分别代表SelectionKey的4个常量值:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

如果你对多个事件感兴趣,你可以使用使用OR操作符。就像这样:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;    

4.4 SelectionKey

正如前面所看到的,当你把channel注册到selector上时,他会返回一个SelectionKey对象。SelectionKey对象包含几个有趣的属性:

  • The interest set:你当前感兴趣的事件,使用interestOps()获取。

  • The ready set:就绪集合,包含一些列就绪事件。一般在select之后,你会使用readyOps()方法访问这个就绪集合。你也可以使用其他方法来检测你感兴趣的事件是否已经就绪。你可以使用一下4个方法,它们会返回一个布尔值。

    1
    2
    3
    4
    selectionKey.isAcceptable();
    selectionKey.isConnectable();
    selectionKey.isReadable();
    selectionKey.isWritable();
  • The Channel

  • The Selector

  • An attached object (optional):你可以给SelectionKey附加一个对象用来识别这个Channel或者附加一个与这个Channel配套的Buffer。

    1
    2
    selectionKey.attach(theObject);
    Object attachedObj = selectionKey.attachment();

4.5 通过selector 来 select 一个channel。

一旦你注册了一个或多个channel到selector,你就可以调用select()方法了。

这里select()有多个版本:

  • int select():阻塞,直到至少有一个channel有你感兴趣的事件就绪了。
  • int select(long timeout):除了设置一个最大阻塞时间,其他和select()一样。
  • int selectNow():不阻塞,直接返回就绪的channel的数量。

select()的int返回值告诉你有多少channel已经处于就绪状态(connect, accept, read or write)。也就是,自从上次调用select()后有多少channel变为就绪了。再进一步解释:如果你调用select()。他会返回1,因为有一个channel变成就绪了。紧接着你又调用了一次select(),又有一个channel就绪了,它还是会返回1。如果你对第一个就绪的channel没做任何操作,那么你现在有2个就绪的channel。但是在两次select()调用之间,只会返回1个channel。

4.6 selectedKeys()

一旦你调用了select()方法,它返回的值标表示一个或多个channel就绪了。你就可以通过调用selectKeys()方法得到就绪的select key集合,从而访问就绪channel。

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();  

当你channel使用Channel.register()注册到selector时,会返回一个SelectionKey对象。这个对象代表channal和selector之间的绑定关系。你可以迭代selectedKeys去访问就绪的channel,就像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

注意每次循环之后会调用 keyIterator.remove() 。Selector并不会帮你从selectedKeys中删除SelectKey实例。当你完成这个channel的处理之后你必须手动删除。当下一次这个channel再次变得继续,selector会再次将他添加到selectedKeys中。

调用SelectionKey.channel()获取到的channel应该转换为原始类型,比如ServerSocketChannel 或者 SocketChannel 再进行操作。

4.7 wakeUp()

一个线程调用select()的时候会被阻塞,但它也可以在没有任何channel就绪的时候跳出阻塞。跳出阻塞可以通过其他线程调用selector.wakeup()方法来达成,selector对象要和上面调用select()的是同一个对象。

如果一个线程调用wakeup()时,并没有其他线程阻塞在select()上。那么下次调用select()时会直接跳出阻塞。

4.8 close()

当你使用完selector之后,你可以调用close()方法。他会关闭selector,并且会时所有与之绑定的SelectionKey失效。但是channel本身并不会因此而关闭。

下面是一个使用selector的一个完整的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

while(true) {
int readyChannels = selector.selectNow();
if(readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}

5. SocketChannel

JAVA NIO的SocketChannel是一个链接TCP套接字的channel。有两种创建SocketChannel的方法。

  1. 你可以创建一个SocketChannel去连接互联网上某个地方的一台服务器。(客户端)
  2. 当ServerSocketChannel接受一个连接时,也会创建一个SocketChannel。(服务端)

5.1 创建SocketChannel

下面代码演示创建一个channel:

1
2
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));

5.2 关闭SocketChannel

你可以通过调用close方法来关闭一个SocketChannel。就像这样:

1
socketChannel.close();    

5.3 从SocketChannel中读取数据

要冲SocketChannel中读取数据,你可以调用read()方法。和下面一样:

1
2
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

首先创建一个ByteBuffer

然后调用SocketChannel.read,把数据从SocketChannel读进Buffer。read()方法返回的int值标识有多少字节被写进buffer了。如果返回-1,则表示读完了(连接已经关闭)。

5.4 向SocketChannel中写数据

调用SocketChannel的write方法通过Buffer作为参数往SocketChannel中写数据。就和下面一样

1
2
3
4
5
6
7
8
9
10
11
String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
//切换为读模式,因为对于channel来说,需要读buffer的数据。
buf.flip();
//hasRemaining(): Tells whether there are any elements between the current position and the limit.
while(buf.hasRemaining()) {
channel.write(buf);
}

注意SocketChannel.write()是怎么在循环内被调用的。每次调用write()方法并不保证往SocketChannel中写多少字节。因此我们需要不断调用write(),直到Buffer中没有可以读的数据。

5.5 非阻塞模式

你可以将SocketChannel设置为非阻塞。当你这样做了之后,你可以异步的调用connect()、read()和write()方法。

connect()

当SocketChannel处于非阻塞模式时,你调用connect()方法会立即返回(可能在建立之前)。为了确定连接是否已经建立成功,你可以调用finishConnection()进行判断。就像这样:

1
2
3
4
5
6
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));

while(! socketChannel.finishConnect() ){
//wait, or do something else...
}

write()

在非阻塞模式下,write()可能任何数据都没写成功,就返回了。因此必须在循环中调用write()。

read()

在非阻塞模型下,read()可能在没读取到任何信息的情况下,就返回了。因此需要注意他的返回值int,他告诉我们读取了多少字节。当返回-1时,表示已经读完了。

5.6 ServerSocketChannel

ServerSocketChannel是一种可以监听TCP连接的Channel。

1
2
3
4
5
6
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
//do something with socketChannel...
}

5.1 新建ServerSocketChannel

你可以通过调用open()方法新建一个ServerSocketChannel。

1
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

5.2 关闭ServerSocketChannel

也可以通过close()方法,来关闭一个ServerSocketChannel。

5.3 监听连接

可以通过accept()方法来监听一个连接。当accept()方法返回时,会创建一个SocketChannel。accept()默认情况下是阻塞的。

如果你要监听多个连接,可以使用一个把accept()方法放到循环中,不断的接收新的连接。

1
2
3
4
5
while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();
//do something with socketChannel...
}

5.4 非阻塞模式

ServerSocketChannel也可以切换为非阻塞模式。在非阻塞模式下调用accept()方法会立即返回,如果没有新连接到来,返回值就是null。

1
2
3
4
5
6
7
8
9
10
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}

6. 非阻塞的服务器

即使你理解JAVA NIO非阻塞的工作特性(selector,channel,buffer等),要设计一个非阻塞的服务器还是挺难的。非阻塞IO比阻塞IO会碰到更多挑战。下面我们将讨论非阻塞模型服务器面临的主要挑战,并描述对应的解决方案。

很难找到设计非阻塞服务器的高质量文章。因此下面提供的解决方案都是基于我们自己的工作实践和理解。如果有更好的方案,我请告诉我。

6.1 非阻塞服务器

这里有一个代码的实现,放在这里:https://github.com/jjenkov/java-nio-server

6.2 非阻塞io管道

非阻塞io管道是指将各个组件串成一个链。在这个链中,包括非阻塞的读写操作。就像这样。

管道流

Component会监听可以读的channel,然后从channel中读取内容。Component会基于读到的内容进行输出,把要输出的内容写到之前的channel中。当然这个模型也不是一定的,可以只做读取操作,也可以只做写入操作。

上面的图例中只有一个component,而实际上不同非阻塞io可能存在多个component。整个链的长度取决于你的要做的事情。

非阻塞io管道可能同时从多个不同的channel中读取数据,例如从多个SocketChannel中读取数据。

控制流在上面的图例中被简化了,是Component主动通过selector问channel要数据,而不是channel直接把数据推给selector,再推给Component。

6.3 阻塞io和非阻塞io的比较

阻塞io和非阻塞io最大的不同是数据如何从底层的channle(或socket或file)中被读取出来的。

io 管道流一般是从流(socket或file)中读取数据,然后将他们分割成一个一个有顺序的消息。这就和使用分词器将一个字符串拆分成多个短语类似。不过,你还可以把流中的数据聚合成一个大的消息实体。我可以使用Message Reader把流中的数据分解为多个消息。下面是一个简单的图例。

Message Reader

画外音:什么是消息?可以简单的理解为一个消息就是一次http请求。因为tcp是一个持久连接,那么一个tcp请求上可以发送多个http请求,但是数据实际上是粘连在一起的。我们需要MessageReader把数据一个一个拆成单独的http请求(也就是消息)。

阻塞io可以使用类似InputStream接口的方式一个字节一个字节的从底层的Channel中读取数据。而且读取方法会被阻塞在那个地方,直到有数据可以读。这就是阻塞io模型下Message Reader的内部实现。

使用阻塞io模型去实现Message Reader会简单很多。一个阻塞io模型下的Message Reader永远都不必处理流中没有数据可读的情况,或者只读取了部分的消息,又或者整个消息的解析需要延期执行的情况。

画外音:整个消息的解析需要延期执行,是因为只读取了部分消息。需要等到完整的消息读取到了,才能进行这个消息的解析。

同样的,一个阻塞io模型下的Message Writer(一个component往流中写数据)永远都不必处理只写进了一部分数据的情况,或者是写数据需要延期执行的情况。

6.4 阻塞io的缺点

虽然阻塞io非常容易实现,但是有一个不幸的缺点。我们每一个io流都需要一个单独的线程去维护,将流中的数据解析成一个一个的消息。这样做的理由是因为每一个io流都会被阻塞,直到有数据可以读取为止。这意味着单线程情况下,如果一个流中没数据可读,它就卡死了。不能再去服务其他的io流。

如果服务器有很多的并发请求,那么必须每个请求都要配一个线程。如果只有几百个并发连接,这应该不是一个问题。但是当有数百万的并发连接数时,这个设计就不能很好的扩展。每个线程的虚拟机栈都可能消耗320K(32位JVM)到1024K(64位JVM)的内存空间。因此100W个线程会消耗掉1TB的内存。这还是服务器没处理请求时就需要的内存消耗(比如:处理请求时会创建新的对象)。

为了减少线程的数量,很多服务器会设计一个线程池,每个线程来处理一个请求。访问的请求会加入到队列中,线程会按照顺序(先进先出)来处理每一个请求。图例如下所示:

blocking-io-thread-model

然而这要求访问连接必须合理的发送数据。如果一个请求很久都处于非活动状态(既不发送数据,又不主动断开),那么这样的连接越多,阻塞的线程也会越多。这意味着服务器响应会变慢甚至是无响应。

有些设计想通过把线程池数设置为动态扩展的方式来缓解这个问题。比如说,线程池的连接数已经耗尽了,那么会创建额外的线程来服务新的请求。这个解决方案意味着你需要更多的非活跃连接,才能把系统给干死。但是要记住,这里还是有一个可运行的线程数的上限。当有100w个非活跃连接时,还是会无法应付。

6.5 最基本的IO模型设计

一个非阻塞的io可以使用单线程从多个流中读取消息。这要求流可以设置为非阻塞模式。当在非阻塞模式时,你从流中读取数据,可能会返回0,也可能返回大于0的整数。返回0时表示没有数据可以读,当返回大于0个字节时,表示有数据可读。

为了当流中没数据的时候,还进行频繁的检测,我们会使用selector。一个或多个SelectableChannel可以注册到同一个Selector中。当你再Selector上调用select()或者selectNow()时,它会返回有数据可读的SelectableChannel给你。整个模型的设计如图例所示:

非阻塞io模型

6.6 读取部分消息

当我们从SelectableChannel中读取数据时,我们不知道读取到数据是一个消息的部分(比如半条消息),还是超过一条消息(比如一条半消息或两条半消息)。就像下面图片展示的一样。

数据粘连

在处理部分消息时,会碰到两个问题:

  1. 检测数据块(每次从Channel中读取一次数据,算一个数据块)中是否有一个完整的消息。
  2. 在收到完整的数据之前,你怎么处理已经收到的部分消息。

检测是否有完整消息,需要Message Reader到数据块中查看数据是否包含至少1条消息。如果数据块中存在1条或多条完整的消息,那么这些完整消息可以发送到管道中进行进一步处理(例如查询数据库,删除记录之类的业务)。因为检测完整的消息这个步骤会反复的执行,因此速度必须做到尽量的快。

当数据块中存在部分消息(数据太少,不是一个完整消息),那么部分消息必须储存到一个地方,直到这个消息的剩余数据全部被服务器收到。

检测完整消息和储存部分消息都是Message Reader的职责。为了避免混淆不同Channel实例的消息数据,针对每一个Channel实例我们都要配一个Message Reader。就像这样:

Message Reader接收数据

当selector感知到某个Channel可读时,就会触发这个Channel关联的Message Reader进行数据的读取。这次数据读取完毕之后,会尝试解析数据块是否包含完整的消息。如果发现完整的消息,就会发送给管道下游对消息进行处理。

毫无疑问,Message Reader是有协议规范的。Message Reader必须直到发送过来的消息是以何种格式进行编码,才能解析数据。如果服务端想兼容多个协议,那么Message Reader需要被设计为插件形式。比如:使用参数配置使用哪个Message Reader.

6.7 储存部分消息

现在我们已经直到只收到部分消息的时候,保存这部分消息是Message Reader的责任。我们必须只要如何存储部分消息。

这里有两个设计思路,可以纳入考虑:

  1. 我们要尽量少的对消息进行拷贝,拷贝越频繁,效率越低。
  2. 我们想把完整的消息按的顺序保存(以字节的形式),这样我们解析的时候会更加方便。

每个Message Reader分配一个固定大小的Buffer

毫无疑问,部分消息必须保存在某种Buffer之中。最简单直接的方法就是每个Message Reader都维护一个Buffer。然后,这个Buffer需要设置为多大呢?它需要大到能保存一个最大允许的完整消息。所以,如果最大允许的消息为1MB,那么每个Message Reader内部的Buffer需要至少1MB的空间。

当并发数大于100w时,每个连接需要1MB是不现实的。100W * 1MB 需要1TB空间,而且要是最大允许的消息大小是16MB 或者 128 MB呢?

每个Message Reader分配一个可动态扩容的Buffer

另外一个方法是去实现一个可动态扩容的Buffer。这个Buffer一开始很小,如果消息很大,Buffer就会自动扩容。这样的话,每个连接就不需要1MB的buffer了。每个连接的消息要多少内存,就占多少内存。

这里有几种方式可以实现动态扩容的Buffer,每种都有有点和缺点。

  • 通过复制来做动态扩容

    实现一个动态扩容的buffer,你一开始的空间肯定比较小(比如4KB)。如果后面发现消息太大放不下,可以再创建一个大一点的Buffer(比如8KB)。然后把4KB的buffer中的内容复制到8KB的buffer中去。

    通过复制来做动态扩容的好处就是消息的内容都按照顺序保存到一个数组中,我们可以很容易的进行解析。但不足就是对于大消息来说需要拷贝多次(扩容多次,因为我们一开始不知道消息本身的大小,无法预先分配空间)。

    为了减少复制的频次,你可以评估一下你系统中经常收到的消息大小。比如你可以观察到大部分消息都小于4kb,因此你可以把buffer的初始大小定义为4kb。然后你发现一旦消息大于4kb,一般都是请求一个网页,而这种情况下,消息通常小于128kb。那么buffer第二级扩容的大小可以设置为128kb。最后你发现一旦消息大于128kb之后,文件大小就无规律可循了,你可以将扩容的最大值限制为消息的最大大小。

    这3级buffer的大小是通过你系统的自身情况来评估的,能有效减少数据的拷贝次数。一个消息如果小于4kb,那么就不用拷贝。一个消息的大小在4kb到128kb之间仅仅只需要拷贝1次,把4kb的buffer中的数据拷贝128kb的buffer中。如果数据大于128kb,只需要拷贝2次。考虑到大于128kb的消息数量比较少,拷贝2次也是可以接受的。

    一旦一个消息被处理完毕了,这个buffer所占用的内存就可以被释放。这样的话,从这个连接收到的新消息又从最低一级的buffer开始接受数据。这样就保证了内存能被多个连接充分利用。理想情况下,不是所有的连接都同时需要最大级别的Buffer。这里有一个Resizable Arrays的实现。

    画外音:ArrayList就是一个可自动扩容的容器,底层是数组。但是你不能自定义它每次扩容的大小,默认情况下它每次扩容当前容量的1.5倍。

  • 通过追加来动态扩容

    另外一个方式实现自动扩容的buffer就是让buffer由多个数组构成。当你需要扩容这个buffer时,你只需要再新建一个字节数组,往新数组里面写数据就好了。

    具体来说由2个方法。一个是创建多个数组实例,然后把这些实例维护到一个list中。另外一个方法是创建一个大的共享数组,将数组分为多个分片,再用一个list维护这些分片的位置。个人认为分片的方法稍微好一些,但是他们的区别很小。

    通过追加的方式来库容的好处就是避免了拷贝。缺点就是数据保存在不同的数组或分片之中。这会让解析变得复杂,因为它需要查看所有的数组或分片中的数据。这个模型比较难以实现。

TLV 编码消息

有一些消息使用TLV的格式(Type,Length,Value)进行编码。这意味着当收到一个消息时,这个消息的总长度就保存在消息头部。这样一来你可以立即知道整个消息应该分配多少内存空间。

TLV编码让内存管理变得更加容易,你可以马上知道该分配多少内存给这个消息。没有任何内存空间被浪费在数组的末端。

TLV编码的一个缺点是在接受到所有数据之前,你必须给它分配足够大的Buffer。一些慢连接发送体积很大的消息会因此消耗掉你所有的内存,使你的服务无响应。

有一个变通的方法就是把一个消息的多个属性(field)单独拆分成多个TLV编码。这样内存每次只需要给一个属性分配内存,而不必给整个消息分配内存。但是当一个属性非常大的时候,还是会造成同样的影响。

另外一个方案就是给接收消息设置一个超时事件,比如10-15秒。这样可以使你的服务器能从接收偶然性的,同时多个大消息中恢复过来。不过会有短暂的无响应时间。另外蓄意的的Dos攻击还是会导致内存占满。

TLV编码格式存在几个不同的变种。有一些TLV编码会把length放到首位然后再是Type和Value。尽管他们的顺序不同,但他们依然还是TLV的变种。

事实上,因为TLV编码格式的内存管理简单导致HTTP1.1看起来是一个垃圾协议。HTTP2.0使用TLV编码格式传输数据解决了这个问题。这也就为什么我们设计自己的网络框架VStack.co project使用TLV编码格式的原因。

6.8 写入部分消息

在非阻塞IO模型下写数据也是一个挑战。当你在非阻塞io模式下的Channel上调用write(ByteBuffer)时,ByteBuffer中的有多少数据会写入Channel是不确定的。write(ByteBuffer)的返回值会告诉你写了多少字节数到Channel里。因此我们可以持续追踪到底写了多少数据。所以这个碰到的挑战是:持续追踪写了多少数据,直到所有的数据都已经写完。

为了管理部分消息的写入Channel,我们需要创建一个Message Writer。和Message Reader一样,每个channel都要配一个Message Writer。在每个Message Write里面我们保存了当且已经写了多少字节了。

为了能够应付一个Message Writer同时需要写多个message到channel的情况,Message Writer内部会维护一个队列保存要写的消息。然后Message Writer会按照队列先进先出的顺序依次尽快将消息写入Channel。下面有一个图例展示了这个模型:

Message Writer

Message Writer每次可能只能发送部分数据,因此需要反复调用写数据的方法。

如果你有很多个连接,那么你同时也会有很多Message Writer实例。查100W个连接是否可以写数据(write ready)是很慢的。首先很多Message Writer实例并不需要写数据,这些Message Writer实例必须要检查写就绪。其次,并不是所有的Channel都处于写就绪。我们不想浪费时间去尝试往那些写未就绪的Channel中写数据。

要检查一个Channel是否写就绪,可以将channel注册到Selector上。然后我们并不想把所有的channel都注册到selector上。想象一下如果你有100w个连接,其中大部分的连接都是空闲的,所有的100w的连接都注册到了selector上。然后调用select()之后,大部分连接都处于写就绪状态。你得检查这些连接的每个Message Writer看是否有数据可写。这是巨大的浪费,因为并没有很多Message Write要写数据。

为了避免检查那些没有数据要写的Message Write 实例和 Channel 实例,我们可以采取下面两个步骤。

  1. 当需要写数据的时候,Message writer把关联的channel注册到selector上(如果没注册)。
  2. 轮询selector,查看是否有写就绪的channel。对于每个写就绪的channel,拿到它的Message Writer,把数据往channel里面写。等全部数据都写完了,再把这个Channel从selector上取消注册。

上面两步可以保证只有要写数据的channel才被注册到selector上。

6.9 整合所有信息

总之非阻塞io服务需要3个组件反复执行:

  • 读组件,检查连接中是否有发送来的数据。
  • 业务处理组件,消费收到的完整消息。
  • 写组件,检查连接是否可以向外写数据。

nio server

6.10 服务器端线程模型

在github仓库中实现的非阻塞线程模型有两个线程。第一个线程从ServerSocketChannel接收新的请求建立连接。第二个线程从SocketChannel中读取消息、处理消息、写出消息。

非阻塞服务器线程模型