NIO多路复用之Selector的使用

news/2025/1/12 1:05:04/

Selector的使用

文章目录

  • Selector的使用
    • 一、阻塞 & 非阻塞
      • 1. 阻塞
      • 2. 非阻塞
    • 二、selector 介绍及常用API
      • 1. 多路复用
      • 2. 常用API
    • 三、处理 accept 事件
    • 四、处理 read 事件
      • 1. 为什么事件必须删除
      • 2. 处理客户端断开问题
        • 2.1 客户端强制断开
        • 2.2 客户端正常断开
      • 3. 处理消息边界问题
    • 五、selector 何时不阻塞
    • 六、使用多线程优化

一、阻塞 & 非阻塞

通过多个客户端向服务器发送数据的例子理解阻塞和非阻塞。

1. 阻塞

服务器端:

// 1. 定义ByteBuffer,存放客户端发来的数据
ByteBuffer buffer = ByteBuffer.allocate(16);// 2. 创建服务器channel
ServerSocketChannel ssc = ServerSocketChannel.open();// 3. 服务器绑定监听端口
ssc.bind(new InetSocketAddress(8080));// 4. 客户端channel的连接集合
List<SocketChannel> channels = new ArrayList<>();//服务器不停的接收客户端的请求
while (true) {// 5. 服务器调用accept建立与客户端的连接// 如果客户端发起了连接请求,且服务器端调用了accept方法,则双方会通过三次握手建立连接// SocketChannel用来与客户端之间通信SocketChannel sc = ssc.accept(); // 阻塞方法,如果没有客户端发来请求,线程停止运行,客户端连接建立之后,继续运行channels.add(sc); //将客户端channel添加到channel集合中for (SocketChannel channel : channels) {// 6. 接收客户端发送的数据//读取的数据存放在bytebuffer中channel.read(buffer); // 阻塞方法,如果客户端仅连接,但是没有发送数据,则阻塞,客户端发送数据之后,线程继续运行}
}

客户端:

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080)); //建立连接
sc.write(StandardCharsets.UTF_8.encode("你好");) //发送数据

如果某个客户端成功向服务器发送了信息,这个客户端再次发送数据会失败,因为服务器会等待客户端连接,连接之后的客户端再次发送数据并没有建立新连接,所以发送失败。也就是说,如果想要接收新的数据,必须建立新的连接。

缺点:

单线程模式下,一个方法的执行会影响另外一个方法,比如上述,一个服务器线程处理多个客户端连接,等待连接时无法读取另一个客户端的数据,读取数据时无法连接另一个客户端,一个客户端处理完毕后才可以处理另一个客户端。

2. 非阻塞

服务器端:

所有的 channel 都要设置为非阻塞模式。

ByteBuffer buffer = ByteBuffer.allocate(16);ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 切换成非阻塞模式,默认为阻塞模式ssc.bind(new InetSocketAddress(8080));List<SocketChannel> channels = new ArrayList<>();//持续等待客户端的连接
while (true) {SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,则sc是null(如果没有连接,会不断的生成null)//客户端有连接if (sc != null) {sc.configureBlocking(false); // 设置为非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {int read = channel.read(buffer);// 非阻塞,如果没有读到数据,线程仍然会继续运行,read值为0}
}

即使连接成功一个客户端,服务器还是会不停的尝试连接(生成null),连接好的客户端可以执行读取数据的操作,也就是说,读取和连接两个操作互不影响。

缺点:

即使没有连接建立和可读数据,线程仍然在不断运行,白白浪费了 cpu。

二、selector 介绍及常用API

1. 多路复用

image-20210625103135577

一个线程配合 selector 就可以监控多个 channel 的事件,事件发生时线程才去处理,事件没有发生则线程处于阻塞状态(多路复用),避免非阻塞模式下CPU空转的问题。

好处:

  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换的次数

2. 常用API

创建selector

//调用静态方法
Selector selector = Selector.open();

绑定channel事件

建立 selector 与 channel 之间的联系,也称之为注册事件,只有绑定之后的事件 selector 才会关心 。

channel.configureBlocking(false); //设置channel为非阻塞模式//将channel注册到selector上,并且关注某一事件类型
SelectionKey key = channel.register(selector, 绑定事件类型);
//返回值表示事件发生后,通过SelectionKey可以得知是什么事件,并且得知是哪个channel发生的事件
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用,一般用于网络编程,而不是文件编程
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 客户端发起连接请求时触发
    • read - 客户端数据可读入时触发,存在因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出至客户端时触发,存在因为发送能力弱,数据暂不能写出的情况

监听channel事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件:

方法1,阻塞直到绑定事件发生

int count = selector.select();
//没有事件发生,就会让线程阻塞,事件发生了才会让线程继续向下运行
//解决了cpu空转的问题

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件发生

int count = selector.selectNow();

三、处理 accept 事件

客户端代码(对以下各种事件都通用):

public class Client {public static void main(String[] args) {try (Socket socket = new Socket("localhost", 8080)) {//客户端发送的数据socket.getOutputStream().write("world".getBytes()); } catch (IOException e) {e.printStackTrace();}}
}

服务器端代码:

@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));//创建SelectorSelector selector = Selector.open();channel.configureBlocking(false);//注册事件SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {//阻塞直到事件发生int count = selector.select();// 获取所有可用的事件,set集合Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理(使用迭代器)Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {//得到某一个SelectionKeySelectionKey key = iter.next();// 判断事件类型if (key.isAcceptable()) {//获取发生这个事件的channel(强转,默认的返回类型为SelectableChannel)ServerSocketChannel c = (ServerSocketChannel) key.channel();// 与客户端建立连接SocketChannel sc = c.accept();// 获取事件以后必须处理,如果没有处理,selector认为有事件没有处理,就不会阻塞,cpu持续被占用//如果不想处理这个事件,可以调用key.cancel方法,取消事件后,会自动的将事件移除,selector依然会阻塞// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}}
}

四、处理 read 事件

服务器端代码:

@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));Selector selector = Selector.open();channel.configureBlocking(false);SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();// 获取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判断事件类型//accept事件if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();sc.configureBlocking(false); //连接建立之后,必须设置为非阻塞模式//将所要管理的channel注册到selector上SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);//可读事件} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();//读取channel中的数据,写入到buffer中ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);//如果客户端正常断开,或者读取完毕,将key删除if(read == -1) {key.cancel();} else {buffer.flip();//读取buffer中的数据}}// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}

1. 为什么事件必须删除

因为 selector 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要自己编码删除。比如上述代码:

  • 第一次触发了 accept 事件,假如没有移除事件,还存在于 selectedKeys 集合
  • 第二次触发了 read 事件,但这时 selectedKeys 中还有上次的 accept 事件 ,在遍历时,首次遍历到的元素依然是上次的 accept 事件,进入 if 分支,但此时已经建立了连接,所以 accept 方法返回的是 null,继续向下执行会出现空指针异常

2. 处理客户端断开问题

2.1 客户端强制断开

客户端强制断开后,如果服务器正在从中读取数据,服务器会抛出 IOException,如果不处理异常,会导致服务器停止运行,此时应该捕获异常,处理异常时将此 key 删除:

image-20210625104029236

2.2 客户端正常断开

连接通道调用 close() 方法,会让客户端正常断开,调用 read() 读取数据时,返回值为-1,所以应该判断,如果值为 -1,将 key 删除。

3. 处理消息边界问题

如果给buffer申请的空间较小,比如4个字节,客户端发送 “中国” 两个汉字,共占用6个字节(utf-8),需要读取两次才可将消息读入,第一次读取4个字节,获取的是 “中” 以及 “国” 的前三分之一,第二次获取 “国” 的后三分之二,“国” 字会被截断(黏包、半包问题),出现乱码,如下:

image-20210625104154120

三种情况:

image-20210625104355082
  • 时刻1表示消息的长度大于buffer的大小,buffer需扩容
  • 时刻2对应半包现象
  • 时刻3对应黏包现象

三种处理方式:

  • 一种思路是客户端和服务器约定消息长度,所有数据包大小一样,服务器按预定长度读取,客户端的消息也按照预定长度发送,缺点是浪费带宽(不够的长度会填充至约定的长度)。
  • 另一种思路是按分隔符拆分,服务器根据分隔符来确定一条完整的消息,缺点是效率低,在服务器端需要一个字节一个字节对比。
  • 第三种思路是数据使用 LTV 格式,即 Length 长度、Type 类型、Value 数据。发送的数据分成三部分,第一部分表示此数据的长度,第二部分是数据的类型,第三部分是实际的数据,服务器根据第一部分在长度已知的情况下,就可以很方便的确定消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量。
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

第二种思路的使用:

假设服务器设置的buffer大小为16个字节,客户端发送的数据为20个字节,则第一次读事件只能传输16个字节,所以需要对buffer进行扩容,扩容之后,第二次读事件把剩余的消息读入,生成一个完整的消息,如下图:

image-20210625104601041

服务器端代码:

private static void split(ByteBuffer source) {//切换到读模式source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一条完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把这条完整消息存入新的ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从source读,向target写for (int j = 0; j < length; j++) {target.put(source.get());}}}//只有找到分隔符,才会读取,如果没有找到分割符,则不会读取,也就是压缩之后,position位于limit处,都是16source.compact(); 
}public static void main(String[] args) throws IOException {Selector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//关注accept事件SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 如果是accept事件if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); //将channel数据通道注册到selector上//第三个参数表示将一个 byteBuffer 作为附件关联到 selectionKey 上//表示这个buffer只能由这个channel使用,此buffer与SelectionKey的生命周期一致SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, buffer);// 如果是read事件} else if (key.isReadable()) { try {// 拿到触发事件的channelSocketChannel channel = (SocketChannel) key.channel(); // 获取 selectionKey 上关联的附件(buffer)ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常断开,read方法返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 如果position和limit相等,则表示不是一条完整的消息,需要扩容if (buffer.position() == buffer.limit()) {//设定扩容机制为2倍ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); //将旧buffer内容写入到新扩容后的buffer中buffer.flip();newBuffer.put(buffer); //将新的buffer作为selectionKey的附加条件,替换旧的bufferkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel();  }}//移除事件iter.remove();}}
}

五、selector 何时不阻塞

  • 事件发生时

    • 客户端发起连接请求时,会触发 accept 事件
    • 客户端发送数据、客户端正常 / 异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次 read 事件
    • channel 可写时,会触发 write 事件
    • 在 Linux 下发生 nio bug
  • 调用 selector.wakeup()

  • 调用 selector.close()

  • selector 所在线程 interrupt

六、使用多线程优化

上述过程只有一个 selector,没有充分利用多核 cpu,现利用多线程进行优化。

分两组 selector:

  • 单线程配一个 selector,专门处理 accept 事件(起名为boss)
  • 创建 CPU 核心数的线程,每个线程配一个 selector,轮流处理 read/write 事件(起名为worker)

服务器端代码:

/*** 流程:* 1. 客户端发起连接,触发accept事件,在accept事件中给worker绑定read事件* 2. 需要保证绑定read事件在worker调用select方法之前,阻塞之后必须得发生一个别的事件才能绑定,耗费时间
*/public class MultiThreadServer {//worker用来处理读写事件static class Worker implements Runnable{private Thread thread; // 不同的worker有不同的线程private Selector selector; // 不同的worker有不同的selectorprivate String name; // 不同的worker有不同的名字private volatile boolean start = false; // worker对应的thread和selector是否已经初始化//队列用来保证绑定read事件在worker调用select方法之前//并发队列用来在多个线程之间传递数据private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();// 构造器,定义worker的名字public Worker(String name) {this.name = name;}// 初始化Thread和selectorpublic void register(SocketChannel sc) throws IOException {//如果已经初始化过了,就不需要再初始化//要保证worker对应一个thread、一个selector,不能每次调用register方法都创建新的if(!start) {selector = Selector.open();thread = new Thread(this, name); // 一个线程对应一个workerthread.start();start = true;}//将绑定操作添加到队列中,此时还没有执行queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null); //绑定read事件} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup();//main线程唤醒selector,防止worker线程中selector阻塞导致read事件无法绑定}//worker的职责,监听读写事件@Overridepublic void run() {while(true) {try {selector.select();//从队列中取出绑定操作并执行Runnable task = queue.poll();if (task != null) {task.run(); //执行队列中的任务,即绑定操作}/*** 这样做的原因:* main线程调用worker的register方法绑定read事件,* 但是select方法的执行不在main线程中,而在worker自己的线程中,* 由于是两个线程,所以并不能保证先后顺序,* 所以利用队列,将绑定read事件和worker调用select方法放在一个线程中,保证先后顺序,*/Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();channel.read(buffer);}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}//main线程当作仅处理accept事件的线程public static void main(String[] args) throws IOException {//boss线程(main线程)专门用来监听accept事件Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();//绑定accept事件SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));//创建cpu核心数的worker并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}//用来保证多个客户端平均的连接到worker上AtomicInteger index = new AtomicInteger();while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);//在accept事件中给worker绑定read事件,调用worker的register方法绑定//保证平均的访问到workerworkers[index.getAndIncrement() % workers.length].register(sc);}}}}
}

http://www.ppmy.cn/news/246490.html

相关文章

NIO网络编程

一、阻塞模式 服务端 public class server {public static void main(String[] args) throws IOException {//用 nio 来理解阻塞模式单线程//ByteBufferByteBuffer buffer ByteBuffer.allocate(16);//1.创建服务器ServerSocketChannel ssc ServerSocketChannel.open();//2.…

NIO的理解和使用

一、概述 NIO是 non-blocking-io的简称&#xff0c;非阻塞IO&#xff0c;由于它是后续出来的IO模型&#xff0c;有时也叫做 new-IO。NIO是后续比如React等的多路复用的基础模型。它是UNIX的五种IO模型中的一种。 NIO有三大组件&#xff1a;buffer、channel和selector&#xff…

NIO基础笔记

Netty深入浅出笔记 1. NIO基础1.1 三大组件1.1.1 Channel & Buffer1.1.2 Selector 1.2 ByteBuffer1.2.1 ByteBuffer 正确使用姿势1.2.2 ByteBuffer结构1.2.3 ByteBuffer核心属性1.2.4 ByteBuffer常见方法1.2.5 字符串与 ByteBuffer 互转1.2.6 Scattering Reads(分散读取)1.…

Netty学习(一)-- Netty 底层 Java NIO

视频地址&#xff0c;建议观看&#xff0c;老师由浅入深&#xff0c;循序渐进&#xff1a; https://www.bilibili.com/video/BV1py4y1E7oA 前面的学习&#xff1a;https://blog.csdn.net/weixin_43989102/article/details/126078132 目录 1、NIO1.1、Selector1&#xff09;多线…

Netty01——NIO 基础

目录 1.三大组件1.1.Channel & Buffer1.2.Selector1.2.1.多线程版设计1.2.2.线程池版设计1.2.3.selector 版设计 2.ByteBuffer2.1.ByteBuffer 使用方式2.2.ByteBuffer 结构2.2.1.ByteBuffer 的属性2.2.2.调试工具类2.2.3.使用调试工具类 2.3.ByteBuffer 常见方法2.3.1.分配…

NIO基础

NIO基础 1.三大组件 1.1 Channel & Buffer Channel有一点类似于stream&#xff0c;它就是读写数据的双向通道&#xff0c;可以从channel将数据读入buffer&#xff0c;也可以将buffer的数据写入channel&#xff0c;而之前的stream要么是输入&#xff0c;要么是输出&#…

JavaNIO——多线程以及IO模型(笔记)

文章目录 一、多线程优化1.1 Boss与Worker1.2 初步实现1Boss&#xff0c;1Worker1.3 线程数目配置 二、UDP三、IO模型3.1 stream&#xff08;BIO&#xff09; vs channel&#xff08;NIO&#xff09;3.2 IO模型3.2.1 阻塞IO3.2.2 非阻塞IO3.3.3 多路复用3.3.4 信号驱动3.3.5 异…

【Netty】第二章 网络编程和 IO 概念剖析

【Netty】第二章 网络编程 文章目录 【Netty】第二章 网络编程一、网络编程1.模拟阻塞模式下服务器单线程处理请求2.模拟非阻塞模式下服务器单线程处理请求3.使用 Selector 改进4.可写事件5.小结6.利用多线程进行优化 二、IO 概念剖析1.Stream 对比 Channel2.IO 模型3.零拷贝 一…