Selector的使用
文章目录
- Selector的使用
- 一、阻塞 & 非阻塞
- 1. 阻塞
- 2. 非阻塞
- 二、selector 介绍及常用API
- 1. 多路复用
- 2. 常用API
- 三、处理 accept 事件
- 四、处理 read 事件
- 1. 为什么事件必须删除
- 2. 处理客户端断开问题
- 2.1 客户端强制断开
- 2.2 客户端正常断开
- 3. 处理消息边界问题
- 五、selector 何时不阻塞
- 六、使用多线程优化
一、阻塞 & 非阻塞
通过多个客户端向服务器发送数据的例子理解阻塞和非阻塞。
1. 阻塞
服务器端:
// 1. 定义ByteBuffer,存放客户端发来的数据
ByteBuffer buffer = ByteBuffer.allocate(16);// 2. 创建服务器channel
ServerSocketChannel ssc = ServerSocketChannel.open();// 3. 服务器绑定监听端口
ssc.bind(new InetSocketAddress(8080));// 4. 客户端channel的连接集合
List<SocketChannel> channels = new ArrayList<>();//服务器不停的接收客户端的请求
while (true) {// 5. 服务器调用accept建立与客户端的连接// 如果客户端发起了连接请求,且服务器端调用了accept方法,则双方会通过三次握手建立连接// SocketChannel用来与客户端之间通信SocketChannel sc = ssc.accept(); // 阻塞方法,如果没有客户端发来请求,线程停止运行,客户端连接建立之后,继续运行channels.add(sc); //将客户端channel添加到channel集合中for (SocketChannel channel : channels) {// 6. 接收客户端发送的数据//读取的数据存放在bytebuffer中channel.read(buffer); // 阻塞方法,如果客户端仅连接,但是没有发送数据,则阻塞,客户端发送数据之后,线程继续运行}
}
客户端:
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080)); //建立连接
sc.write(StandardCharsets.UTF_8.encode("你好");) //发送数据
如果某个客户端成功向服务器发送了信息,这个客户端再次发送数据会失败,因为服务器会等待客户端连接,连接之后的客户端再次发送数据并没有建立新连接,所以发送失败。也就是说,如果想要接收新的数据,必须建立新的连接。
缺点:
单线程模式下,一个方法的执行会影响另外一个方法,比如上述,一个服务器线程处理多个客户端连接,等待连接时无法读取另一个客户端的数据,读取数据时无法连接另一个客户端,一个客户端处理完毕后才可以处理另一个客户端。
2. 非阻塞
服务器端:
所有的 channel 都要设置为非阻塞模式。
ByteBuffer buffer = ByteBuffer.allocate(16);ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 切换成非阻塞模式,默认为阻塞模式ssc.bind(new InetSocketAddress(8080));List<SocketChannel> channels = new ArrayList<>();//持续等待客户端的连接
while (true) {SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,则sc是null(如果没有连接,会不断的生成null)//客户端有连接if (sc != null) {sc.configureBlocking(false); // 设置为非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {int read = channel.read(buffer);// 非阻塞,如果没有读到数据,线程仍然会继续运行,read值为0}
}
即使连接成功一个客户端,服务器还是会不停的尝试连接(生成null),连接好的客户端可以执行读取数据的操作,也就是说,读取和连接两个操作互不影响。
缺点:
即使没有连接建立和可读数据,线程仍然在不断运行,白白浪费了 cpu。
二、selector 介绍及常用API
1. 多路复用
一个线程配合 selector 就可以监控多个 channel 的事件,事件发生时线程才去处理,事件没有发生则线程处于阻塞状态(多路复用),避免非阻塞模式下CPU空转的问题。
好处:
- 让这个线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换的次数
2. 常用API
创建selector
//调用静态方法
Selector selector = Selector.open();
绑定channel事件
建立 selector 与 channel 之间的联系,也称之为注册事件,只有绑定之后的事件 selector 才会关心 。
channel.configureBlocking(false); //设置channel为非阻塞模式//将channel注册到selector上,并且关注某一事件类型
SelectionKey key = channel.register(selector, 绑定事件类型);
//返回值表示事件发生后,通过SelectionKey可以得知是什么事件,并且得知是哪个channel发生的事件
- channel 必须工作在非阻塞模式
- FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用,一般用于网络编程,而不是文件编程
- 绑定的事件类型可以有
- connect - 客户端连接成功时触发
- accept - 客户端发起连接请求时触发
- read - 客户端数据可读入时触发,存在因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出至客户端时触发,存在因为发送能力弱,数据暂不能写出的情况
监听channel事件
可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件:
方法1,阻塞直到绑定事件发生
int count = selector.select();
//没有事件发生,就会让线程阻塞,事件发生了才会让线程继续向下运行
//解决了cpu空转的问题
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件发生
int count = selector.selectNow();
三、处理 accept 事件
客户端代码(对以下各种事件都通用):
public class Client {public static void main(String[] args) {try (Socket socket = new Socket("localhost", 8080)) {//客户端发送的数据socket.getOutputStream().write("world".getBytes()); } catch (IOException e) {e.printStackTrace();}}
}
服务器端代码:
@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));//创建SelectorSelector selector = Selector.open();channel.configureBlocking(false);//注册事件SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {//阻塞直到事件发生int count = selector.select();// 获取所有可用的事件,set集合Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理(使用迭代器)Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {//得到某一个SelectionKeySelectionKey key = iter.next();// 判断事件类型if (key.isAcceptable()) {//获取发生这个事件的channel(强转,默认的返回类型为SelectableChannel)ServerSocketChannel c = (ServerSocketChannel) key.channel();// 与客户端建立连接SocketChannel sc = c.accept();// 获取事件以后必须处理,如果没有处理,selector认为有事件没有处理,就不会阻塞,cpu持续被占用//如果不想处理这个事件,可以调用key.cancel方法,取消事件后,会自动的将事件移除,selector依然会阻塞// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}}
}
四、处理 read 事件
服务器端代码:
@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));Selector selector = Selector.open();channel.configureBlocking(false);SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();// 获取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判断事件类型//accept事件if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();sc.configureBlocking(false); //连接建立之后,必须设置为非阻塞模式//将所要管理的channel注册到selector上SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);//可读事件} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();//读取channel中的数据,写入到buffer中ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);//如果客户端正常断开,或者读取完毕,将key删除if(read == -1) {key.cancel();} else {buffer.flip();//读取buffer中的数据}}// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}
1. 为什么事件必须删除
因为 selector 在事件发生后,就会将相关的 key 放入 selectedKeys
集合,但不会在处理完后从 selectedKeys
集合中移除,需要自己编码删除。比如上述代码:
- 第一次触发了 accept 事件,假如没有移除事件,还存在于
selectedKeys
集合 - 第二次触发了 read 事件,但这时
selectedKeys
中还有上次的 accept 事件 ,在遍历时,首次遍历到的元素依然是上次的 accept 事件,进入 if 分支,但此时已经建立了连接,所以 accept 方法返回的是 null,继续向下执行会出现空指针异常
2. 处理客户端断开问题
2.1 客户端强制断开
客户端强制断开后,如果服务器正在从中读取数据,服务器会抛出 IOException
,如果不处理异常,会导致服务器停止运行,此时应该捕获异常,处理异常时将此 key 删除:
2.2 客户端正常断开
连接通道调用 close()
方法,会让客户端正常断开,调用 read()
读取数据时,返回值为-1,所以应该判断,如果值为 -1,将 key 删除。
3. 处理消息边界问题
如果给buffer申请的空间较小,比如4个字节,客户端发送 “中国” 两个汉字,共占用6个字节(utf-8),需要读取两次才可将消息读入,第一次读取4个字节,获取的是 “中” 以及 “国” 的前三分之一,第二次获取 “国” 的后三分之二,“国” 字会被截断(黏包、半包问题),出现乱码,如下:
三种情况:
- 时刻1表示消息的长度大于buffer的大小,buffer需扩容
- 时刻2对应半包现象
- 时刻3对应黏包现象
三种处理方式:
- 一种思路是客户端和服务器约定消息长度,所有数据包大小一样,服务器按预定长度读取,客户端的消息也按照预定长度发送,缺点是浪费带宽(不够的长度会填充至约定的长度)。
- 另一种思路是按分隔符拆分,服务器根据分隔符来确定一条完整的消息,缺点是效率低,在服务器端需要一个字节一个字节对比。
- 第三种思路是数据使用 LTV 格式,即 Length 长度、Type 类型、Value 数据。发送的数据分成三部分,第一部分表示此数据的长度,第二部分是数据的类型,第三部分是实际的数据,服务器根据第一部分在长度已知的情况下,就可以很方便的确定消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量。
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
第二种思路的使用:
假设服务器设置的buffer大小为16个字节,客户端发送的数据为20个字节,则第一次读事件只能传输16个字节,所以需要对buffer进行扩容,扩容之后,第二次读事件把剩余的消息读入,生成一个完整的消息,如下图:
服务器端代码:
private static void split(ByteBuffer source) {//切换到读模式source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一条完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把这条完整消息存入新的ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从source读,向target写for (int j = 0; j < length; j++) {target.put(source.get());}}}//只有找到分隔符,才会读取,如果没有找到分割符,则不会读取,也就是压缩之后,position位于limit处,都是16source.compact();
}public static void main(String[] args) throws IOException {Selector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//关注accept事件SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 如果是accept事件if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); //将channel数据通道注册到selector上//第三个参数表示将一个 byteBuffer 作为附件关联到 selectionKey 上//表示这个buffer只能由这个channel使用,此buffer与SelectionKey的生命周期一致SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, buffer);// 如果是read事件} else if (key.isReadable()) { try {// 拿到触发事件的channelSocketChannel channel = (SocketChannel) key.channel(); // 获取 selectionKey 上关联的附件(buffer)ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常断开,read方法返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 如果position和limit相等,则表示不是一条完整的消息,需要扩容if (buffer.position() == buffer.limit()) {//设定扩容机制为2倍ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); //将旧buffer内容写入到新扩容后的buffer中buffer.flip();newBuffer.put(buffer); //将新的buffer作为selectionKey的附加条件,替换旧的bufferkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel(); }}//移除事件iter.remove();}}
}
五、selector 何时不阻塞
-
事件发生时
- 客户端发起连接请求时,会触发 accept 事件
- 客户端发送数据、客户端正常 / 异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次 read 事件
- channel 可写时,会触发 write 事件
- 在 Linux 下发生
nio bug
时
-
调用
selector.wakeup()
-
调用
selector.close()
-
selector 所在线程 interrupt
六、使用多线程优化
上述过程只有一个 selector,没有充分利用多核 cpu,现利用多线程进行优化。
分两组 selector:
- 单线程配一个 selector,专门处理 accept 事件(起名为boss)
- 创建 CPU 核心数的线程,每个线程配一个 selector,轮流处理 read/write 事件(起名为worker)
服务器端代码:
/*** 流程:* 1. 客户端发起连接,触发accept事件,在accept事件中给worker绑定read事件* 2. 需要保证绑定read事件在worker调用select方法之前,阻塞之后必须得发生一个别的事件才能绑定,耗费时间
*/public class MultiThreadServer {//worker用来处理读写事件static class Worker implements Runnable{private Thread thread; // 不同的worker有不同的线程private Selector selector; // 不同的worker有不同的selectorprivate String name; // 不同的worker有不同的名字private volatile boolean start = false; // worker对应的thread和selector是否已经初始化//队列用来保证绑定read事件在worker调用select方法之前//并发队列用来在多个线程之间传递数据private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();// 构造器,定义worker的名字public Worker(String name) {this.name = name;}// 初始化Thread和selectorpublic void register(SocketChannel sc) throws IOException {//如果已经初始化过了,就不需要再初始化//要保证worker对应一个thread、一个selector,不能每次调用register方法都创建新的if(!start) {selector = Selector.open();thread = new Thread(this, name); // 一个线程对应一个workerthread.start();start = true;}//将绑定操作添加到队列中,此时还没有执行queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null); //绑定read事件} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup();//main线程唤醒selector,防止worker线程中selector阻塞导致read事件无法绑定}//worker的职责,监听读写事件@Overridepublic void run() {while(true) {try {selector.select();//从队列中取出绑定操作并执行Runnable task = queue.poll();if (task != null) {task.run(); //执行队列中的任务,即绑定操作}/*** 这样做的原因:* main线程调用worker的register方法绑定read事件,* 但是select方法的执行不在main线程中,而在worker自己的线程中,* 由于是两个线程,所以并不能保证先后顺序,* 所以利用队列,将绑定read事件和worker调用select方法放在一个线程中,保证先后顺序,*/Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();channel.read(buffer);}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}//main线程当作仅处理accept事件的线程public static void main(String[] args) throws IOException {//boss线程(main线程)专门用来监听accept事件Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();//绑定accept事件SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));//创建cpu核心数的worker并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}//用来保证多个客户端平均的连接到worker上AtomicInteger index = new AtomicInteger();while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);//在accept事件中给worker绑定read事件,调用worker的register方法绑定//保证平均的访问到workerworkers[index.getAndIncrement() % workers.length].register(sc);}}}}
}