相关概念
-
同步:线程自己去获取结果(一个线程)
-
异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
-
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞
IO模型 描述 特点 类比 同步非阻塞 线程发起IO请求后,可以继续执行其他任务,但需要定期轮询检查IO操作是否完成 避免阻塞,但频繁轮询消耗CPU资源 小学生一直闹着要东西 同步阻塞 线程发起IO请求后,必须等待IO操作完成才能继续执行其他任务 最简单但效率最低 大学生安静等东西,什么也不做 同步多路复用 使用一个线程管理多个IO操作,通过select/poll/epoll等机制同时监控多个IO事件 可用较少线程处理大量IO请求 员工把要的东西不断告诉经理,经理不断告诉哪些东西到了,员工自己拿走 异步非阻塞 线程发起IO请求后立即返回,由操作系统通过回调机制通知完成 完全不会阻塞线程,最灵活 员工把要的东西不断告诉经理,经理同意提供,并在准备好后直接送给员工 异步阻塞 不存在此组合 异步操作本身就意味着不会阻塞线程 -
fd:文件描述符,一个非负整数,用于标识打开的文件、套接字或其他 IO 资源。每个 fd 对应一个内核中的文件表项,包含文件的偏移量、状态等信息
-
select:一个系统调用。它允许一个线程监控多个 fd 的状态变化,并在任一 fd 就绪时返回给调用线程
工作流程
整体流程
- 请求数据:用户进程发起 read() 请求,请求网络中的某个数据
- 转交任务:用户进程将 read() 任务交给本地操作系统内核空间处理(进程无法自己处理)
- 网络请求:内核接收请求后,向服务器发出 read() 请求,等待数据回传
- 复制数据:内核接收回传数据后,将缓冲区内容复制到本地
- 返回数据:内核空间将本地数据返回给用户进程
操作系统内核态两阶段
-
等待数据阶段:等待外部数据到达内核缓冲区
-
复制数据阶段:将数据从内核缓冲区复制到用户进程空间
IO 模型
一、阻塞 IO
-
定义:应用进程在发起读取请求后会一直阻塞,直到整个IO操作(包括等待数据和复制数据两个阶段)完成。这是最简单但效率最低的IO模型。
-
工作流程
- 应用进程向内核发起 recfrom 读取数据
- 内核请求服务器,并请求远程数据
(应用进程阻塞)
- 数据报到达内核缓冲区,内核复制数据到应用空间
- 数据复制完成,内核向应用程序返回成功指令
- 应用进程读取应用空间数据报,并进行处理
-
流程图
二、非阻塞 IO
- 定义:应用进程不会在等待数据阶段被阻塞,但需要不断轮询检查数据状态,这会消耗CPU资源。数据复制阶段仍然是阻塞的,这是它与异步IO的主要区别。
- 工作流程
- 应用进程向内核发起 recfrom 读取数据
- 内核立即返回 EWOULDBLOCK 错误码,内核请求服务器,并请求远程数据报
(应用进程不阻塞)
- 应用进程不断向内核发起 recfrom 读取数据,直至数据准备好
- 数据报到达内核缓冲区,内核复制数据到应用空间
- 数据复制完成,内核向应用程序返回成功指令
- 应用进程读取应用空间数据报,并进行处理
- 流程图
三、⭐ 多路复用
-
核心思想:由一个专门的线程担任选择器(Selector,也叫多路复用器)同时监控多个网络连接(文件描述符fd),集中管理多个客户端连接的IO事件,实现一个线程处理多个连接的效果
-
优点:多路复用IO模型允许一个线程管理多个连接,大大提高了系统的并发处理能力,特别适合高并发场景。而阻塞IO模型中每个连接需要一个线程来处理,并发能力低
-
相关概念
- Socket 对象:一个可以 IO 的资源, 发送数据就是对 Socket 进行写操作, 接收数据就是读 Socket
- fd 文件描述符:一个非负整数,代表一个已经打开的文件、管道、网络套接字或其他 I/O 资源
- select:多路复用 I/O 模型的系统调用,可以同时监视多个文件描述符的状态变化。有容量限制(默认1024个fd)和性能瓶颈(每次都需要将所有fd从用户态拷贝到内核态)
- epoll:Linux 特有的高性能I/O多路复用机制,解决了select的主要缺点。它支持的fd数量受系统内存限制,使用事件通知机制避免了无效的轮询,并通过内存映射减少了数据拷贝
-
工作流程
- 应用进程向内核请求数据,将 fd 传递给 select(或者其他IO复用API)
- 线程阻塞在 select 操作上,由 select 侦测 fd 是否准备就绪
- 当存在准备就绪的 fd 时,select 返回数据可读状态
- 应用程序收到 select 的可读状态时,调用 recvfrom 读取数据(需要阻塞)
四、信号驱动 IO
-
适用场景:适合需要同时处理多个IO事件的场景,但在Linux系统中使用较少,因为多路复用IO通常能提供更好的性能和更简单的编程模型
-
核心思想:应用进程发出请求后等内核空间数据准备好了再通知应用进程
-
工作流程
- 应用进程向内核请求数据,并建立一个 SIGIO 信号联系(通过 sigaction 系统调用)
- 数据到达内核空间后,通过 SIGIO 信号通知 应用进程 数据已经准备好(可读状态)
- 线程收到可读状态信号,向内核发起 recvfrom 读取请求,阻塞并读取数据
-
流程图
五、异步非阻塞 IO
-
目标:解决数据复制阶段的阻塞问题
-
核心思想:应用进程发起一个 IO 操作后立即返回,由内核负责整个 IO 操作(包括数据准备和拷贝两段操作)
-
实现方案(依靠内核的异步支持)
- Windows 系统:通过 IOCP 实现了真正的异步 IO
- Linux 系统:异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势(最大劣势)
-
工作流程
- 应用进程请求数据,向内核发起系统调用,并注册回调函数
- 内核接收请求并立即返回,应用程序继续执行后续任务(无需阻塞)
- 内核发起请求,等待数据到达内核空间后,将数据直接拷贝到应用空间
- 内核通过回调函数告知应用进程数据已到达应用空间(IO 操作已完成)
- 应用进程直接处理数据
-
流程图
文件 AIO
-
定义:默认文件 AIO 使用的线程都是守护线程,所以最后要执行
System.in.read()
以避免守护线程意外结束 -
代码实现(AsynchronousFileChannel)
java">@Slf4j public class AioDemo1 {public static void main(String[] args) throws IOException {try{AsynchronousFileChannel s =AsynchronousFileChannel.open(Paths.get("1.txt"), StandardOpenOption.READ);ByteBuffer buffer = ByteBuffer.allocate(2);log.debug("begin...");s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer attachment) {log.debug("read completed...{}", result);buffer.flip();debug(buffer);}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {log.debug("read failed...");}});} catch (IOException e) {e.printStackTrace();}log.debug("do other things...");System.in.read();} }
-
输出
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin... 13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things... 13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 0d |a. | +--------+-------------------------------------------------+----------------+
-
网络 AIO
java">public class AioServer {public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.accept(null, new AcceptHandler(ssc));System.in.read();}private static void closeChannel(AsynchronousSocketChannel sc) {try {System.out.printf("[%s] %s close\\n", Thread.currentThread().getName(), sc.getRemoteAddress());sc.close();} catch (IOException e) {e.printStackTrace();}}private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;public ReadHandler(AsynchronousSocketChannel sc) {this.sc = sc;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {try {if (result == -1) {closeChannel(sc);return;}System.out.printf("[%s] %s read\\n", Thread.currentThread().getName(), sc.getRemoteAddress());attachment.flip();System.out.println(Charset.defaultCharset().decode(attachment));attachment.clear();// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件sc.read(attachment, attachment, this);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}}private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;private WriteHandler(AsynchronousSocketChannel sc) {this.sc = sc;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容if (attachment.hasRemaining()) {sc.write(attachment);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();closeChannel(sc);}}private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {private final AsynchronousServerSocketChannel ssc;public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc = ssc;}@Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf("[%s] %s connected\\n", Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer = ByteBuffer.allocate(16);// 读事件由 ReadHandler 处理sc.read(buffer, buffer, new ReadHandler(sc));// 写事件由 WriteHandler 处理sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件ssc.accept(null, this);}@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}}
}
多路复用器 Selector
- 定义: Java NIO 中用于处理多个客户端连接的核心类之一,它使得单个线程能够高效地管理多个 I/O 通道(例如,多个客户端连接的 SocketChannel),避免了传统多线程处理每个连接的高开销
- select() 方法:Selector 的核心方法。它会阻塞,直到至少有一个注册的事件发生
- 监听事件类型
- OP_READ(可读事件)
- OP_WRITE(可写事件)
- OP_CONNECT(连接事件)
工作流程
- 注册 Channel:将多个 Channel(如客户端连接)注册到 Selector,并指定监听的事件类型
- 监听事件:调用 select() 方法,Selector 会阻塞,直到一个或多个被注册的 Channel 变为可读、可写或发生连接等事件
- 获取返回结果:select() 返回一个整数,表示已就绪的通道数量
- 获取 Channel 状态:通过 selectedKeys() 方法获取所有准备好的 SelectionKey 集合。每个 SelectionKey 对应一个注册的通道,它包含了通道的 I/O 状态、可用的事件等信息。
- 遍历 Channel 状态:遍历 SelectionKey 集合,通过SelectionKey 获取通道的状态(例如,是否可读、可写或已连接)
- 处理 Channel 事件:通过 SelectionKey 中包含的通道信息,处理相应的 I/O 操作
- 如果事件是
OP_READ
,你就可以从SocketChannel
中读取数据。 - 如果事件是
OP_WRITE
,你就可以往SocketChannel
写数据。 - 如果事件是
OP_ACCEPT
,你可以接受客户端的连接请求。
- 如果事件是
- 循环等待:
- 由于
select()
会在没有事件时阻塞,所以代码通常会放在一个循环中,让Selector
持续监视事件的发生。 - 每次
select()
阻塞后,处理完就绪的事件后,会继续等待新的事件。
- 由于
多路复用代码实现
-
目标:实现一个基于Java NIO的多路复用服务器,主要目标是通过单线程高效管理多个客户端连接
-
功能
- 创建一个Selector实例,作为多路复用器来监听多个通道的事件
- 注册5个SocketChannel到Selector上,并设置为非阻塞模式,监听可读事件
- 通过无限循环持续监听通道事件,使用select()方法阻塞等待通道就绪
- 当有通道就绪时,获取并遍历就绪的SelectionKey集合,处理每个就绪通道
- 根据通道的就绪事件类型(读/写)执行相应的数据处理逻辑
- 处理客户端断开连接的情况,并提供数据处理的框架
-
代码
java">public class SelectorTest {public static void main(String[] args) throws IOException {// 创建 SelectorSelector selector = null;selector = Selector.open(); // 显式创建 Selector// 注册 5 个 SocketChannel 到 Selectorfor(int i = 0; i < 5; i ++ ) {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false); // 设置非阻塞模式socketChannel.register(selector, SelectionKey.OP_READ); // 注册可读事件}while (true) {int readyChannels = selector.select(); // 等待有通道准备好if (readyChannels == 0) continue; // 没有通道就绪时继续循环// 获取就绪的 SelectionKeySet<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while (keyIterator.hasNext()) {SelectionKey key = keyIterator.next();SocketChannel client = (SocketChannel) key.channel(); // 获取通道handleClientData(client, key); // 处理通道中的就绪事件keyIterator.remove();}}}// 客户端数据处理方法private static void handleClientData(SocketChannel client, SelectionKey key) throws IOException {ByteBuffer buffer = ByteBuffer.allocate(1024); // 创建缓冲区if(key.isReadable()) {int bytesRead = client.read(buffer); // 从通道读取数据// 如果读取到 -1,表示客户端关闭连接,需关闭该通道, 取消该通道的注册if (bytesRead == -1) {client.close();key.cancel();} else {buffer.flip(); // 准备读取数据processData(buffer); // 这里可以将数据交给客户端自己的逻辑来处理}} else if (key.isWritable()) {client.write(buffer); // 从缓冲区中读取数据,并写入到通道中if (buffer.hasRemaining()) {// 如果缓冲区没有完全写入,可能需要将写操作标记为待处理key.interestOps(SelectionKey.OP_WRITE);}}}// TODO 在这里处理读取到的数据(比如解析数据、执行业务逻辑等)private static void processData(ByteBuffer buffer) {System.out.println("Processing data...");while (buffer.hasRemaining()) {System.out.print((char) buffer.get());}}}
注意:上述代码没有进行任何异常处理,是为了让示例核心逻辑更加清晰,实际使用应该添加处理逻辑
参考资料:浅谈5种IO模型——阻塞式IO、非阻塞式IO、信号驱动IO、多路复用IO及异步IO-CSDN博客