操作系统之 高性能IO浅析

Posted by Jason Lee on 2019-07-10

IO模型分类

  • (1)同步阻塞IO(Blocking IO):即传统的IO模型。
  • (2)同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。注意这里所说的NIO并非Java的NIO(New IO)库。
  • (3)IO多路复用(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。 有兴趣可以看一下Epoll的原理
  • (4)异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。

同步和异步

同步和异步的概念描述的是用户线程与内核的交互方式:

  • 同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;
  • 而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞和非阻塞

阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式:

  • 阻塞是指IO操作需要彻底完成后才返回到用户空间;
  • 而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

同步阻塞IO

同步阻塞IO模型是最简单的IO模型,用户线程在内核进行IO操作时被阻塞。

如图1所示,用户线程通过系统调用read发起IO读操作,由用户空间转到内核空间。内核等到数据包到达后,然后将接收的数据拷贝到用户空间,完成read操作。
用户线程使用同步阻塞IO模型的伪代码描述为:

1
2
3
4
{
read(socket, buffer);
process(buffer);
}

即用户需要等待read将socket中的数据读取到buffer后,才继续处理接收的数据。整个IO请求的过程中,用户线程是被阻塞的,这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。

同步非阻塞IO

同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK。这样做用户线程可以在发起IO请求后可以立即返回。

如图2所示,由于socket是非阻塞的方式,因此用户线程发起IO请求时立即返回。但并未读取到任何数据,用户线程需要不断地发起IO请求,直到数据到达后,才真正读取到数据,继续执行。
用户线程使用同步非阻塞IO模型的伪代码描述为:

1
2
3
4
{
while(read(socket, buffer) != SUCCESS);
process(buffer);
}

即用户需要不断地调用read,尝试读取socket中的数据,直到读取成功后,才继续处理接收的数据。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞IO这一特性。

IO多路复用

什么是IO多路复用

关于I/O多路复用(又被称为“事件驱动”),首先要理解的是,操作系统为你提供了一个功能,当你的某个socket可读或者可写的时候,它可以给你一个通知。这样当配合非阻塞的socket使用时,只有当系统通知我哪个描述符可读了,我才去执行read操作,可以保证每次read都能读到有效数据而不做纯返回-1和EAGAIN的无用功。写操作类似。

操作系统的这个功能通过select/poll/epoll/kqueue之类的系统调用函数来使用,这些函数都可以同时监视多个描述符的读写就绪状况,这样,多个描述符的I/O操作都能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的“复用”指的是复用同一个线程。

select 函数

IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。

如图3所示,用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。
当数据到达时,socket被激活(用户线程被唤起),select函数返回。用户线程正式发起read请求,读取数据并继续执行。

从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。

用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

1
2
3
4
5
6
7
8
9
10
11
12
{
select(socket);
while(1) {
sockets = select();
for(socket in sockets) {
if(can_read(socket)) {
read(socket, buffer);
process(buffer);
}
}
}
}

其中while循环前将socket添加到select监视中,然后在while内一直调用select获取被激活的socket,一旦socket可读,便调用read函数将socket中的数据读取出来。

select 函数的缺点:

  1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  2. 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  3. select支持的文件描述符数量太小了,默认是1024

poll

  • poll 的方式和select 很大,但是没有1024的限制,我们可以简单的理解为select 向内核态传递数据的时候是通过数组的方式,这样一来就有限制,而poll 则是使用了 链表的方式,如此就可以避免了突破了1024的限制

epoll

  • epoll的原理我已经在Epoll的原理这篇文章分析过了,这里组要讲一下epoll是如何解决select的三个问题的。

epoll优点

  • epoll既然是对 selectpoll 的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?
    • 我们先看一下 epollselect 和poll的调用接口上的不同,selectpoll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_waitepoll_create是创建一个 epoll 句柄;epoll_ctl 是注册要监听的事件类型;epoll_wait则是等待事件的产生。

    • 对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。

    • 对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait 的工作实际上就是在这个就绪链表中查看有没有就绪的fd。

    • 对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

IO多路复用模型使用了Reactor设计模式实现了这一机制。

Reactor设计模式

Rector解决了什么

它要解决什么问题呢?线程在真正处理请求之前首先需要从 socket 中读取网络请求,而在读取完成之前,线程本身被阻塞,不能做任何事,这就导致线程资源被占用,而线程资源本身是很珍贵的,尤其是在处理高并发请求时。

而 Reactor 模式指出,在等待 IO 时,线程可以先退出,这样就不会因为有线程在等待 IO 而占用资源。但是这样原先的执行流程就没法还原了,因此,我们可以利用事件驱动的方式,要求线程在退出之前向 event loop 注册回调函数,这样 IO 完成时 event loop 就可以调用回调函数完成剩余的操作。

所以说,Reactor 模式通过减少服务器的资源消耗,提高了并发的能力。

Rector 实现

Rector 设计理念

EventHandler抽象类表示IO事件处理器,它拥有IO文件句柄Handle,我们可以简单理解为拥有 socket 这个对象

继承于EventHandler的子类可以对事件处理器的行为进行定制就是我们的具体的hander。

Reactor类用于管理EventHandler(注册、删除等),并使用handle_events实现事件循环,不断调用同步事件多路分离器(一般是内核)的多路分离函数select,只要某个文件句柄被激活(可读/写等),select就返回(阻塞),handle_events就会调用与文件句柄关联的事件处理器的handle_event进行相关操作。 有些简单的世界中 Rector 处理时间循环的可以具体为一个Acceptor 类,用于accept接受 socket 链接。 而Rector 可以负责分发 具体相当于一个 dispatch 类似的组件。

由于select函数是阻塞的,因此多路IO复用模型也被称为异步阻塞IO模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。
一般在使用IO多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起IO请求时,数据已经到达了,用户线程一定不会被阻塞。

事件循环不断地调用select获取被激活的socket,然后根据获取socket对应的EventHandler,执行器handle_event函数即可。

单线程 Reactor 模式

在该种模式下我们使用的是一个单线程的 Reactor,acceptor() 处理器注册了 ACCEPT 事件,即连接事件,当有连接请求时 Reactor 会将其分发给 acceptor() 处理。

但在这种模式下,Reactor 线程不但要处理 accept()、read()、send(),连非IO业务也要处理,如果业务逻辑复杂,这可能会使 Reactor 线程无法处理其它事件的响应。

为了避免这种事情发生,我们需要把非IO业务逻辑处理交给子线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class Reactor {
private int port;
private Thread app;
public Reactor(int port) throws IOException {
this.port = port;
}

class NIOServer implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
NIOServer() throws Exception {
System.out.println("init");
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
// 非阻塞
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
//阻塞等待事件
int select = selector.select();
// 事件列表
if (select > 0) {
Set selected = selector.selectedKeys();
System.out.println("select invoke:" + select);
Iterator it = selected.iterator();
while (it.hasNext()) {
//分发事件
dispatch((SelectionKey) (it.next()));
it.remove();
}
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
private void dispatch(SelectionKey key) throws Exception {
System.out.println("dispatch");
if (key.isAcceptable()) {
//新链接建立,注册
register(key);
} else if (key.isReadable()) {
//读事件处理
read(key);
} else if (key.isWritable()) {
//写事件处理
wirte(key);
}
}

private void read(SelectionKey key) throws IOException {
System.out.println("read");
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 面向 Buffer
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println("data:"+ Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());
key.interestOps(SelectionKey.OP_READ);

}

private void wirte(SelectionKey key) {

}
private void register(SelectionKey key) throws Exception {
System.out.println("register");
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 获得和客户端连接的通道
SocketChannel channel = server.accept();
channel.configureBlocking(false);
//客户端通道注册到selector 上
channel.register(this.selector, SelectionKey.OP_READ);
}
}

public void start() throws Exception {
app = new Thread(new NIOServer());
app.start();
}
}

单线程 Reactor 模式 + 线程池

通过加入工作线程池,把具体的逻辑操作交由子线程,提高了 Reactor 线程的IO响应时间,但是这样的模式还是存在着缺陷。

就是Reactor 线程要处理包括I/O的accept()、read()、write()以及connect()操作,当同时有大量的连接建立时,单线程的 Reactor性能会下降,然后可能会使大量客户端连接超时,最终使大量消息积压和连接超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
public class Reactor {
private int port;
private Thread app;

public Reactor(int port) throws IOException {
this.port = port;
}

class NIOServer implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;

NIOServer() throws Exception {
System.out.println("init");
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
// 非阻塞
serverSocket.configureBlocking(false);
SelectionKey register = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
register.attach(new Acceptor(register, selector));
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
int select = selector.select();
if (select > 0) {
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
it.remove();
}
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}

private void dispatch(SelectionKey key) throws Exception {
Runnable runnable = (Runnable) (key.attachment());
if (runnable != null) {
runnable.run();
}
}
}

class Acceptor implements Runnable {
SelectionKey key;
Selector selector;
public Acceptor(SelectionKey key, Selector selector) {
this.key = key;
this.selector = selector;
}

@Override
public void run() {
try {
System.out.println("register");
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 获得和客户端连接的通道
SocketChannel c = server.accept();
if (c != null) {
//
new MultiThreadHandler(c, selector);
//new Handler(c, selector);
}
}catch (Exception e) {
System.out.println("run errro");
System.out.println(e.toString());
}
}
}

/**
* 处理读写业务逻辑
*/
class Handler implements Runnable {
static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;
public Handler(SocketChannel socket, Selector selector) throws Exception {
this.state = READING;
this.socket = socket;
socket.configureBlocking(false);
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);

}

@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
} catch (IOException e) {
e.printStackTrace();
}
}

private void read() throws IOException {
process();
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}

private void write() throws IOException {
process();
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}

/**
* task 业务处理
*/
public void process() throws IOException {
System.out.println("read");

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 面向 Buffer
int read = socket.read(byteBuffer);
byteBuffer.flip();
if (read > 0) {
System.out.println("data:" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());

}
sk.interestOps(SelectionKey.OP_READ);
}
}

class MultiThreadHandler extends Handler {
ExecutorService executorService;

public MultiThreadHandler(SocketChannel socket, Selector selector) throws Exception {
super(socket, selector);
//多线程处理业务逻辑
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

}
@Override
public void process() {
executorService.submit(() -> {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 面向 Buffer
int read = socket.read(byteBuffer);
byteBuffer.flip();
if (read > 0) {
System.out.println("data:" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());

}
sk.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {

}
});
}
}


public void start() throws Exception {
app = new Thread(new NIOServer());
app.start();
}

}

多线程的 Reactor 模式

在多线程的 Reactor 模式中,我们分为 mainReactor 和 subReactor,每一个 Reactor 线程都会有自己的 Selector 与不同的事件循环逻辑。

其中 mainReactor 主要负责接受客户的连接请求,然后将建立的 ScoketChannel 传递给 subReactor,由 subReactor 来完成和客户端的通信。

而 subReactor 一般会有多个,这可以很好的解决单线程 Reactor 模式下的瓶颈。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
public class NIORector {
private int port;
private Thread app;

public NIORector(int port) throws IOException {
this.port = port;
}

class NIOServer implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;

NIOServer() throws Exception {
System.out.println("init");
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
// 非阻塞
serverSocket.configureBlocking(false);
SelectionKey register = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
register.attach(new MultiWorkThreadAcceptor(serverSocket));
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
int select = selector.select();
if (select > 0) {
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next()));
it.remove();

}
}
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
//这个是事件分发器,相当于Reactor 里的分发器
private void dispatch(SelectionKey key) throws Exception {
Runnable runnable = (Runnable) (key.attachment());
if (runnable != null) {
runnable.run();
}
}
}
// 这里的Acceptor 相当于时间 接收器
class MultiWorkThreadAcceptor implements Runnable {
private ServerSocketChannel server;
// cpu线程数相同多work线程
int workCount = Runtime.getRuntime().availableProcessors();
SubReactor[] workThreadHandlers = new SubReactor[workCount];
volatile int nextHandler = 0;

public MultiWorkThreadAcceptor(ServerSocketChannel serverSocket) {
this.server = serverSocket;
nextHandler = 0;
for (int i = 0; i < workThreadHandlers.length; i++) {
try {
workThreadHandlers[i] = new SubReactor();
new Thread(workThreadHandlers[i]).start();
} catch (Exception e) {
System.out.println("MultiWorkThreadAcceptor error" + e);
}

}
}

@Override
public void run() {
try {
System.out.println("register");
// 有socket 链接上来
SocketChannel c = server.accept();
// 注册读写
if (c != null) {
synchronized (c) {
// 顺序获取SubReactor,然后注册channel
SubReactor work = workThreadHandlers[nextHandler];
// 这里注册
work.registerChannel(c);
nextHandler++;
if (nextHandler >= workThreadHandlers.length) {
nextHandler = 0;
}
}
}
} catch (Exception e) {
System.out.println("run errro");
System.out.println(e.toString());
}
}
}

class SubReactor implements Runnable {
final Selector mySelector;
volatile boolean isStop = false;
int workCount = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(workCount);

public SubReactor() throws Exception {
// 每个SubReactor 一个selector
this.mySelector = SelectorProvider.provider().openSelector();
}

public void registerChannel(SocketChannel c) throws Exception {
registerChannel(c, new MultiThreadHandler());
}

public void registerChannel(SocketChannel c, Handler hander) throws Exception {
c.configureBlocking(false);
SelectionKey register = c.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
register.attach(hander);
System.out.println("registerChannel success!!");
}


@Override
public void run() {
while (!isStop) {
try {
//每个SubReactor 自己做事件分派处理读写事件
int select = mySelector.selectNow();
if (select == 0) {
continue;
}
Set<SelectionKey> keys = mySelector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
MultiThreadHandler handler = (MultiThreadHandler) key.attachment();
iterator.remove();
handler.hander(key);
}
} catch (Exception e) {
System.out.println("sub running error:" + e.getMessage());
}
}

}
}

public interface Handler {
void hander(SelectionKey key) throws IOException;
}

class MultiThreadHandler implements Handler {
ExecutorService executorService;

public MultiThreadHandler() throws Exception {
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
@Override
public void hander(SelectionKey key) throws IOException {
if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}

public void read(SelectionKey key) throws IOException {
//下一步处理写事件
System.out.println("read");
SocketChannel socket = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 面向 Buffer
int read = socket.read(byteBuffer);
byteBuffer.flip();
if (read > 0) {
System.out.println("data:" + Charset.defaultCharset().newDecoder().decode(byteBuffer).toString());

}
key.interestOps(SelectionKey.OP_READ);
}

public void write(SelectionKey key) {
}
}

public void start() throws Exception {
app = new Thread(new NIOServer());
app.start();
}
}

总结

IO多路复用是最常使用的IO模型,但是其异步程度还不够“彻底”,因为它使用了会阻塞线程的select系统调用。因此IO多路复用只能称为异步阻塞IO,而非真正的异步IO。

异步IO

“真正”的异步IO需要操作系统更强的支持。在IO多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。

而在异步IO模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可。
异步IO模型使用了Proactor设计模式实现了这一机制。

如图6,Proactor模式和Reactor模式在结构上比较相似,不过在用户(Client)使用方式上差别较大。Reactor模式中,用户线程通过向Reactor对象注册感兴趣的事件监听,然后事件触发时调用事件处理函数。而Proactor模式中,用户线程将AsynchronousOperation(读/写等)、Proactor以及操作完成时的CompletionHandler注册到AsynchronousOperationProcessor。

AsynchronousOperationProcessor使用Facade模式提供了一组异步操作API(读/写等)供用户使用,当用户线程调用异步API后,便继续执行自己的任务。AsynchronousOperationProcessor 会开启独立的内核线程执行异步操作,实现真正的异步。当异步IO操作完成时,AsynchronousOperationProcessor将用户线程与AsynchronousOperation一起注册的Proactor和CompletionHandler取出,然后将CompletionHandler与IO操作的结果数据一起转发给Proactor,Proactor负责回调每一个异步操作的事件完成处理函数handle_event。

虽然Proactor模式中每个异步操作都可以绑定一个Proactor对象,但是一般在操作系统中,Proactor被实现为Singleton模式,以便于集中化分发操作完成事件。

如图7所示,异步IO模型中,用户线程直接使用内核提供的异步IO API发起read请求,且发起后立即返回,继续执行用户线程代码。不过此时用户线程已经将调用的AsynchronousOperation和CompletionHandler注册到内核,然后操作系统开启独立的内核线程去处理IO操作。当read请求的数据到达时,由内核负责读取socket中的数据,并写入用户指定的缓冲区中。最后内核将read的数据和用户线程注册的CompletionHandler分发给内部Proactor,Proactor将IO完成的信息通知给用户线程(一般通过调用用户线程注册的完成事件处理函数),完成异步IO。
用户线程使用异步IO模型的伪代码描述为:

1
2
3
4
5
6
void UserCompletionHandler::handle_event(buffer) {
process(buffer);
}
{
aio_read(socket, new UserCompletionHandler);
}

用户需要重写CompletionHandler的handle_event函数进行处理数据的工作,参数buffer表示Proactor已经准备好的数据,用户线程直接调用内核提供的异步IO API,并将重写的CompletionHandler注册即可。

相比于IO多路复用模型,异步IO并不十分常用,不少高性能并发服务程序使用IO多路复用模型+多线程任务处理的架构基本可以满足需求。况且目前操作系统对异步IO的支持并非特别完善,更多的是采用IO多路复用模型模拟异步IO的方式(IO事件触发时不直接通知用户线程,而是将数据读写完毕后放到用户指定的缓冲区中)。Java7之后已经支持了异步IO,感兴趣的读者可以尝试使用。

参考



支付宝打赏 微信打赏

赞赏一下