关于 Java NIO 的 Selector 的事儿,这篇文章里面全都有

news/2025/2/14 6:17:31/

前面 4 篇文章深入分析了 NIO 三大组件中的两个:Buffer 和 Channel:

  • 【死磕 NIO】— 深入分析Buffer
  • 【死磕 NIO】— 深入分析Channel和FileChannel
  • 【死磕NIO】— 跨进程文件锁:FileLock
  • 【死磕NIO】— 探索 SocketChannel 的核心原理

这篇文章则介绍第三个组件:Selector。

相比 Buffer 和 Channel 而言,Selector 对于 NIO 来说显得更加重要,因为它是 NIO 实现多路复用的核心,它的使命就是完成 IO 的多路复用。

Selector 简介

在前一篇文章:【死磕 NIO】— ServerSocketChannel 的应用实例 ,大明哥分析了 ServerSocketChannel 两种模式的缺点

  • 阻塞模式:所有阻塞方法都会引起线程的暂停,根本无法应用到业务中来
  • 非阻塞模式:CPU 一直在空转,浪费资源

所以,如果是我们服务端单独使用 ServerSocketChannel 确实是很麻烦,典型的吃力不讨好。故而我们希望有一个组件能够统一管理我们的 Channel,这个组件就是选择器 Selector。

Selector(选择器)是 Channel 的多路复用器,它可以同时监控多个 Channel 的 IO 状况,允许单个线程来操作多个 Channel。如下:

Selector 的作用是什么?

Selector 提供选择执行已经就绪的任务的能力。从底层来看,Selector 提供了询问 Channel 是否已经准备好执行每个 I/O 操作的能力。Selector 允许单线程处理多个 Channel。仅用单个线程来处理多个 Channels 的好处是,只需要更少的线程来处理 Channel 。事实上,可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。

Selector 的使用

使用 Selector 的主要流程如下:

  1. 打开 Selector
  2. 将 Channel 注册到 Selector 中,并设置要监听的事件
  3. 轮询处理 IO 操作

打开 Selector

和 SocketChannel 相似,调用 Selector.open() 就可以打开一个选择器实例。

Selector selector = Selector.open();

注册 Selector

为了将 Channel 和 Selector 配合使用,我们需要将 Channel 注册到对应的 Selector 上,调用 SelectableChannel.register() 方法来实现。

channel.configureBlocking(false);
SelectionKey key = channel.register(selector,Selectionkey.OP_ACCEPT);

这里有一个要注意的地方,所有注册到 Selector 中的 Channel 都必须是非阻塞的。怎么判断 Channel 是否可以设置为非阻塞呢?判断它是否继承了SelectableChannel,SelectableChannel 是一个抽象类,它提供了实现 Channel 的可选择性所需要的公共方法。而 FileChannel 没有继承 SelectableChannel ,所以它不能使用 Selector。

register() 提供了两个参数,一个是要注册的 Selector 是谁,第二个参数是对什么事件感兴趣。事件类型有四种:

  • 连接 : SelectionKey.OP_CONNECT
  • 接收 : SelectionKey.OP_ACCEPT
  • 可读 : SelectionKey.OP_READ
  • 可写 : SelectionKey.OP_WRITE

如果感兴趣的事件不止一个,则可以使用“位运算 | ” 来组合多个事件,如: SelectionKey.OP_CONNECT | SelectionKey.OP_ACCEPT

需要提醒的是,Selector 关注的不是 Channel 的操作,而是 Channel的某个操作的一种就绪状态。一旦 Channel 具备完成某个操作的条件,表示该 Channel 的某个操作已经就绪,就可以被 Selector 查询到,程序可以对该 Channel 进行对应的操作。比如说,某个 SocketChannel 可以连接到一个服务器,则处于“连接就绪”(OP_CONNECT)。某给 ServerSocketChannel 可以接收新的连接,则处理“接收就绪”(SelectionKey.OP_ACCEPT)。

轮询处理 IO 操作

将 Channel 注册到 Selector 并关注相对应的时间后,就可以轮询处理 IO 事件了。

Selector 提供了方法 select(),该方法可以查询出已经就绪的 Channel操作,如果没有事件发生,则该方法会一直阻塞,直到有事件。select() 有三个重载方法:

  • select():阻塞到至少有一个通道在你注册的事件上就绪了。·
  • select(long timeout):和select()一样,但最长阻塞事件为timeout毫秒。
  • selectNow():非阻塞,只要有通道就绪就立刻返回。

select() 返回值为 int 类型,该值表示有多少 Channel 的操作已经就绪,更准确地说是上一次 select() 到这一次 select() 方法之间的时间段内,有多少 Channel 变成了就绪状态。

select() 返回后,如果返回值大于 0 ,则可以调用 selectedKeys() 方法,该方法返回一个 Set 集合,该集合是一个 SelectionKey 的集合,SelectionKey 表示的是可选择通道 SelectableChannel 和一个特定的 Selector之间的注册关系。

  • SelectionKey 是一个抽象类,表示 SelectableChannel 在 Selector 中注册的标识.每个 Channel 向 Selector 注册时,都将会创建一个selectionKey
  • SelectionKey 是 SelectableChannel 与 Selector 的建立关系,并维护了 Channel 事件
  • 可以通过 cancel() 方法取消 key,取消的 key 不会立即从 Selector 中移除,而是添加到 cancelledKeys 中,在下一次 select() 操作时移除它.所以在调用某个 key 时,需要使用 isValid 进行校验。

SelectionKey 提供了两个非常重要的 “Set”:interest setready set

  • interest set 表示感兴趣的事件,我们可以通过以下方式获取:
int interestSet = selectionKey.interestOps();boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;
  • ready set:代表了 Channel 所准备好了的操作。
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

以下代码是一个处理 IO 操作的完整代码:

while (true) {selector.select();Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> keyIterator = selectedKeys.iterator();while (iterator.hasNext()) {SelectionKey key = keyIterator.next();if(key.isAcceptable()) {// a connection was accepted by a ServerSocketChannel.} else if (key.isConnectable()) {// a connection was established with a remote server.} else if (key.isReadable()) {// a channel is ready for reading} else if (key.isWritable()) {// a channel is ready for writing}// 这段代码非常重要,后面演示key.remove();}
}

这里有一段非常重要的代码 key.remove(),这行代码表示,我已经在处理该 IO 事件了,需要删除。

实例

简单实例

下面大明哥用 Selector 实现一个完整的案例。

public static void main(String[] args) throws Exception {// 创建 ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 设置为非阻塞serverSocketChannel.configureBlocking(false);// 绑定 8081 端口serverSocketChannel.bind(new InetSocketAddress(8081));// 打开 SelectorSelector selector = Selector.open();// 将 SocketChannel 注册到  Selector// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// select 方法,一直阻塞直到有事件发生selector.select();// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 获取一个 SelectionKey 后,我们要将其删除掉,表示我们已经处理了这个事件iterator.remove();if (key.isAcceptable()) {// 连接时间发生// 当客户端连接服务端的时候,我们需要服务单与之建立连接// 需要注意的是在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannelServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();// 需要从 socketChannel 获取 SocketChanelSocketChannel clientChannel = socketChannel.accept();log.info("{} 建立连接",clientChannel);// 设置 clientChannel 为非阻塞clientChannel.configureBlocking(false);clientChannel.register(selector,SelectionKey.OP_READ);} else if (key.isReadable()) {// 获取的为 SocketChannelSocketChannel clientChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(64);int size = clientChannel.read(byteBuffer);if (size < 0) {// 小于 0 表示客户端断开连接,需要关闭该 SocketChannellog.info("{},断开了连接",clientChannel);clientChannel.close();} else {byteBuffer.flip();CharBuffer charBuffer = Charset.forName("utf-8").decode(byteBuffer);log.info("{},发来了消息,消息内容是:{}",clientChannel,charBuffer.toString());// 服务端接收消息后,给客户端发送给客户端Scanner scanner = new Scanner(System.in);String string = scanner.nextLine();ByteBuffer writeBuffer = Charset.forName("utf-8").encode(string);clientChannel.write(writeBuffer);if (writeBuffer.hasRemaining()) {// 如果不能一次性发完只需要触发 write 事件去发key.attach(writeBuffer);key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);}}} else if (key.isWritable() && key.isValid()) {ByteBuffer byteBuffer = (ByteBuffer) key.attachment();SocketChannel clientChannel = (SocketChannel) key.channel();byteBuffer.flip();clientChannel.write(byteBuffer);if (!byteBuffer.hasRemaining()) {// 如果已完,则只无须关注 write 事件key.attach(null);key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);}}}}
}

启动服务端,打开 iTerm,输入命令 telnet localhost 8081,连接服务端,这时服务端接收到客户端 client-01 的连接请求,进行建立连接。

建立连接后,客户端发送消息i am client_01,服务端收到消息,然后给客户端发送消息hi,client-01,i am server

  • 服务端

  • 客户端

分析为什么要:key.remove()

这里拿上面那个问题来说明,为什么要加这 key.remove() 代码呢?首先这段代码的意思是说获取一个 SelectionKey 后,我们需要将其删除,表示我们已经对该 IO 事件进行了处理,如果没有这样代码会有什么后果呢?报 NullPointerException

注释掉 key.remove() 这行代码,然后加一些日志,然后去掉服务端发送消息的代码,如下:

while (true) {// select 方法,一直阻塞直到有事件发生selector.select();// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();log.info("key={}", key);// 获取一个 SelectionKey 后,我们要将其删除掉,表示我们已经处理了这个事件//iterator.remove();if (key.isAcceptable()) {// 连接时间发生// 当客户端连接服务端的时候,我们需要服务单与之建立连接// 需要注意的是在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannelServerSocketChannel socketChannel = (ServerSocketChannel) key.channel();// 需要从 socketChannel 获取 SocketChanelSocketChannel clientChannel = socketChannel.accept();log.info("{} 建立连接", clientChannel);// 设置 clientChannel 为非阻塞clientChannel.configureBlocking(false);clientChannel.register(selector, SelectionKey.OP_READ);} else if (key.isReadable()) {// 获取的为 SocketChannelSocketChannel clientChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(64);int size = clientChannel.read(byteBuffer);if (size < 0) {// 小于 0 表示客户端断开连接,需要关闭该 SocketChannellog.info("{},断开了连接", clientChannel);clientChannel.close();} else {byteBuffer.flip();CharBuffer charBuffer = Charset.forName("utf-8").decode(byteBuffer);log.info("{},发来了消息,消息内容是:{}", clientChannel, charBuffer.toString());}}}System.out.println("==============================我是分割线===================================");
}

启动服务端,然后客户端连接,发送消息,结果如下:

为什么会这样呢?这里我们来梳理整个流程。

  • 首先服务端创建一个 Selector,该 Selector 与 ServerSocketChannel 绑定,且关注 accept 事件。如下

  • 当客户端发起连接时,selector.selectedKeys() 会返回 Set 集合,该集合包含了已经准备就绪的 SelectionKey,这个时候只有连接事件,相对应的 SelectionKey 为 2b71fc7e

  • 当服务端与客户端建立连接后,绑定 Selector 并关注 read 事件。这里需要注意的是 Selector 并不会主动去删除 SelectionKey,它只会增加,所以这个时候 Selector 里面有两个 SelectionKey,一个是 2b71fc7e(accept),一个是 1de0aca6(read)。建立连接后,事件处理完成,会该事件与之对应的事件去掉,也就是 2b71fc7e 的 SelectionKey 绑定的 ServerSocketChannel ,但是 Selector 里面对应的 SelectionKey 还是存在的。

  • 当客户端给服务端发送消息时,服务端监测到有事件发生,会将发生时间的 SelectionKey@1de0aca6 加入到 selectedKey 中,如下:

在迭代过程第一次取的是 SelectionKey@1de0aca6,这个是读事件,可以正常读,打印客户端发送过来的,但是第二次读取的是 SelectionKey@2b71fc7e,但是这个 Key 与之相绑定的事件已经处理了,它为 null,那肯定会报 NullPointerException。所以在使用 NIO 时一定要主动删除已经处理过的 SelectionKey ,既主动调用 key.remove(),删除该 SelectionKey。


http://www.ppmy.cn/news/1217295.html

相关文章

【开源】基于Vue.js的校园失物招领管理系统的设计和实现

目录 一、摘要1.1 项目介绍1.2 项目详细录屏 二、研究内容2.1 招领管理模块2.2 寻物管理模块2.3 系统公告模块2.4 感谢留言模块 三、界面展示3.1 登录注册3.2 招领模块3.3 寻物模块3.4 公告模块3.5 感谢留言模块3.6 系统基础模块 四、免责说明 一、摘要 1.1 项目介绍 基于Vue…

蒙特卡洛树搜索(Monte Carlo Tree Search)揭秘

一. 什么是蒙特卡洛树搜索 蒙特卡洛树搜索(MCTS)是一种启发式搜索算法&#xff0c;一般用在棋牌游戏中&#xff0c;如围棋、西洋棋、象棋、黑白棋、德州扑克等。MCTS与人工神经网络结合&#xff0c;可发挥巨大的作用&#xff0c;典型的例子是2016年的AlphaGo&#xff0c;以4:1…

永磁材料测试系统参考标准

1. 概述 TY1000是一套专用于测量永磁材料磁性能的智能化系统&#xff0c;由励磁与测量主机、电磁铁、磁测量传感器、计算机及测量软件等组成。适用于测量各类型永磁材料的磁性能&#xff0c;并绘制相关磁特性曲线&#xff0c;具有操作便捷、测量快速、重复性好、可靠性高等特点…

Windows10下Docker安装Mysql5.7

文章目录 Windows10下Docker安装Mysql5.7环境说明打开命令工具搜索镜像拉取镜像查看所有镜像启动镜像查看容器查看所有容器查看运行中容器 进入容器进入容器命令输入账号命令输入密码 添加mysql的远程账号创建一个数据库 Windows10下Docker安装Mysql5.7 环境说明 docker&…

lineage编译

安装依赖 echo "Wang812330500" | sudo -S apt install -y bc bison build-essential \ ccache curl flex g-multilib gcc-multilib git git-lfs gnupg gperf imagemagick \ lib32ncurses5-dev lib32readline-dev lib32z1-dev libelf-dev liblz4-tool libncurses5 …

Python使用带账密的Socks5代理

测试代码如下 import requestsip 146.78.85.145 port 9527 username ********* password ********* proxies {http: fsocks5://{username}:{password}{ip}:{port},https: fsocks5://{username}:{password}{ip}:{port} } print(proxies) body requests.get("https:/…

Ubuntu安装mysql(解决ubuntu Access denied for user ‘root‘@‘localhost‘报错)

1、安装mysql sudo apt-get install mysql-server 上述命令会安装以下包&#xff1a; apparmor mysql-client-5.7 mysql-common mysql-server mysql-server-5.7 mysql-server-core-5.7 因此无需再安装mysql-client等。安装过程会提示设置mysql root用户的密码&#xff0c;设…

【管理运筹学】运筹学“背诵手册”(二) | 对偶理论与灵敏度分析

二、对偶理论与灵敏度分析 用矩阵形式表示原问题和对偶问题&#xff1a; max ⁡ z C X s . t . { A X ≤ b X ≥ 0 \max z\pmb{CX}\\ s.t.\begin{cases} \pmb{AX\leq b} \\ \pmb{X}\geq\pmb{0} \end{cases} maxzCXs.t.{AX≤bX≥0​ 其中 C ( c 1 , c 2 , ⋯ , c n ) , X (…