文章目录
- 1. 前言
- 2. 网络编程(多线程)
- 1. 多线程优化(单个worker)
- 2. 解决多线程的问题
- 1. Run内部执行
- 2. 更简便的方法
- 3. 多线程优化(多个worker)
- 4. CPU个数获取
- 3. NIO vs BIO
- 1. stream vs channel
- 2. IO模型
- 3. 异步IO零拷贝问题
- 1. 传统IO问题
- 2. 内部工作流程
- 3. NIO优化
- 4. 进一步优化(linux2.1)
- 5. 进一步优化(linux2.4)
- 6. 零拷贝
- 4. AIO(异步IO)
1. 前言
笔记基于黑马的Netty教学,视频地址:黑马Netty
2. 网络编程(多线程)
1. 多线程优化(单个worker)
对于单线程下的任务,我们不得不考虑当一个事件耗费的事件长的时候,其实对于其他事件的处理事件也要加长,耗费的时间多。
对于CPU,我们自然要考虑如何使用多线程来优化,前面的只有一个选择器,没有充分利用CPU,如何改进?
- 单线程配一个选择器,专门处理accept事件
- 当客户端发出读写事件的时候,有其他线程专门负责处理
代码实现如下:
@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {//boss线程处理accept事件,主线程Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));//1. 创建固定的worker并初始化worker worker = new worker("worker-8");worker.register();while(true){boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();iter.remove();//accept事件if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());//2. 关联selectorlog.debug("before register...{}", sc.getRemoteAddress());sc.register(worker.selector, SelectionKey.OP_READ, null);log.debug("after register...{}", sc.getRemoteAddress());}}}}static class worker implements Runnable{private Thread thread;private Selector selector;private String name;private volatile boolean start = false; //一开始线程还没初始化public worker(String name) {this.name = name;}//初始化线程的selectorpublic void register() throws IOException {if(!start){//先创建selector,防止空指针selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}}@Overridepublic void run() {while(true){try{selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel)key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}}catch (Exception e){e.printStackTrace();}}}}
}
public class TestThreadClint {public static void main(String[] args) throws IOException {SocketChannel ssc = SocketChannel.open();ssc.connect(new InetSocketAddress("localhost", 8080));ssc.write(Charset.defaultCharset().encode("123456789abcdef"));System.in.read();}
}
输出结果:
DEBUG [boss] (21:51:49,144) (MultiThreadServer.java:45) - connected.../127.0.0.1:52972
DEBUG [boss] (21:51:49,146) (MultiThreadServer.java:47) - before register.../127.0.0.1:52972
情况:这时候就有问题了,客户端明明发送了数据给服务端,但是worker线程的select方法却没执行,而worker线程是已经启动的了,这是为什么?
观察上面的代码,我们预先设想是我们先创建好像线程worker,调用register方法的时候worker就会进入while循环,selector.select()等待数据的输入,但是此时selector.select()方法阻塞住了并影响了下面的sc.register()方法,无法给客户端关联通道,所以数据无法接收到。
我们发现下面两行代码只要调换顺序就可以了,因为先注册好事件类型再阻塞就可以接受到数据了。但是这时候又有一个问题,当又有一个客户端连接上来发送信息的时候,由于此时worker线程是阻塞在register()方法里面的,而新的线程需要重新进行sc.register,这时候下面两行代码的顺序又不好说了,治标不治本。
sc.register(worker.selector, SelectionKey.OP_READ, null); //boss
worker.register(); //worker
那么有没有什么办法能解决的?
2. 解决多线程的问题
1. Run内部执行
我们有一个想法,就是把sc.register(worker.selector, SelectionKey.OP_READ, null); 这行代码放进worker线程之中,在同一个线程中运行,这样就能保证顺序了。
我们用一个Queue先存储好任务,在run里面执行,流程是:
先是worker的start()方法,再把注册的任务存入队列里面,最后唤醒阻塞的selector来进行注册,第二次进入也是一样的流程。
@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {//boss线程处理accept事件,主线程Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));//1. 创建固定的worker并初始化worker worker = new worker("worker-8");while(true){boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();iter.remove();//accept事件if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());//2. 关联selectorlog.debug("before register...{}", sc.getRemoteAddress());worker.register(sc); //worker//sc.register(worker.selector, SelectionKey.OP_READ, null); //bosslog.debug("after register...{}", sc.getRemoteAddress());}}}}static class worker implements Runnable{private Thread thread;private Selector selector;private String name;private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();private volatile boolean start = false; //一开始线程还没初始化public worker(String name) {this.name = name;}//初始化线程的selectorpublic void register(SocketChannel sc) throws IOException {if(!start){//先创建selector,防止空指针selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}//向队列中添加了任务,但这个任务没有执行queue.add(()->{try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});//唤醒selectorselector.wakeup();}@Overridepublic void run() {while(true){try{selector.select(); //注册之后才阻塞Runnable task = queue.poll();if(task != null){task.run(); //执行了注册}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel)key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}}catch (Exception e){e.printStackTrace();}}}}
}
2. 更简便的方法
static class worker implements Runnable{private Thread thread;private Selector selector;private String name;private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();private volatile boolean start = false; //一开始线程还没初始化public worker(String name) {this.name = name;}//初始化线程的selectorpublic void register(SocketChannel sc) throws IOException {if(!start){//先创建selector,防止空指针selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}selector.wakeup(); //boss线程,因为是boss线程调用的这个方法sc.register(selector, SelectionKey.OP_READ, null); //boss线程}@Overridepublic void run() {while(true){try{selector.select(); //注册之后才阻塞Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel)key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}}catch (Exception e){e.printStackTrace();}}}}
我们可以分析这样写有没有问题,下面是三个的线程执行顺序,有3种情况:
- 第一种情况
selector.wakeup(); //boss线程sc.register(selector, SelectionKey.OP_READ, null); //boss线程selector.select(); //worker线程
- 第二种情况
selector.select(); //worker线程selector.wakeup(); //boss线程sc.register(selector, SelectionKey.OP_READ, null); //boss线程
- 第三种情况
selector.wakeup(); //boss线程selector.select(); //worker线程sc.register(selector, SelectionKey.OP_READ, null); //boss线程
第一种情况:wakeup这个方法有点特殊,相当于发一张票,类似park和unpark,然后运行到select方法的时候发现有这么一张票,就不会阻塞,继续向下运行。
第二种情况:先阻塞了再被唤醒,也没问题
第三种情况:和第一种一样了,都是预先发票的情况。
3. 多线程优化(多个worker)
我们在当个worker的基础上定义一个worker数组,多线程的个数(worker数组的长度)看自己的CPU是多少核的,设置多少个线程就行。但是为了效率,其实也有一个专门算CPU核数和线程数之间关系的算法,百度可以搜到。其次为了达到轮循的效果,我们定义一个AtomicInteger,相当于计数器,调用index.getAndIncrement() % workers.length求下标,达到一个轮循的效果。
@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {//boss线程处理accept事件,主线程Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));//1. 创建固定的worker并初始化//创建一个worker数组//Runtime.getRuntime().availableProcessors()获取CPU的核数worker[] workers = new worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new worker("worker-" + i);}//定义一个计数器AtomicInteger index = new AtomicInteger();while(true){boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while(iter.hasNext()){SelectionKey key = iter.next();iter.remove();//accept事件if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());//2. 关联selectorlog.debug("before register...{}", sc.getRemoteAddress());//使用一个轮循算法,平均分配workers[index.getAndIncrement() % workers.length].register(sc); //sc.register(worker.selector, SelectionKey.OP_READ, null); //bosslog.debug("after register...{}", sc.getRemoteAddress());}}}}static class worker implements Runnable{private Thread thread;private Selector selector;private String name;private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();private volatile boolean start = false; //一开始线程还没初始化public worker(String name) {this.name = name;}//初始化线程的selectorpublic void register(SocketChannel sc) throws IOException {if(!start){//先创建selector,防止空指针selector = Selector.open();thread = new Thread(this, name);thread.start();start = true;}selector.wakeup(); //boss线程sc.register(selector, SelectionKey.OP_READ, null); //boss线程}@Overridepublic void run() {while(true){try{selector.select(); //注册之后才阻塞Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel)key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}}catch (Exception e){e.printStackTrace();}}}}
}
4. CPU个数获取
- Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理CPU的个数而不是容器申请时的个数
- 这个问题直到JDK10才解决,使用jvm参数UseContainerSupport配置,默认开启
- 如果IO用的多,CPU用的少,可以根据阿姆达尔定理计算,会考虑到IO计算比例的
3. NIO vs BIO
1. stream vs channel
- stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
- stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel (文件channel不可以)可配合 selector 实现多路复用
- 二者均为全双工,即读写可以同时进行,Stream是单向的,但是也支持,只要忙的过来
2. IO模型
同步阻塞、同步非阻塞、多路复用、异步阻塞、异步非阻塞
当调用一次channel.read 或者 stream.read后,会切换到操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
- 等待数据阶段
- 复制数据阶段
左边是用户发起了一个read方法,但是读取的方法java干不了,必须要调用操作系统的方法才可以完成读取。牵扯到等待数据,有数据了就读取数据读到内存,这个阶段叫做复制数据。复制数据完成之后,再由操作系统切换回用户这边进行读取。
模型(在百度上找了几张图片):
-
阻塞IO模型:用户发起read请求之后就阻塞了,等数据拷贝完成之后就开始读取。
-
非阻塞IO模型:相当于一个while循环不断发送请求,发送read请求后不会阻塞等待,只要没读到就直接返回失败,再次发起请求。但是和第一种比起来因为多此进行操作系统和用户区的切换,导致性能耗费是很高的。
-
多路复用模型:其实就是前面的selector选择器进行select方法的调用。
对比前面的2种优势在哪? -
对比阻塞IO,最大的优势就是可以同时处理多个请求,如果是阻塞IO处理下面的请求,当处理到accept的时候,必须等待有客户端连接上来了才可以继续处理下面的read请求,这时候就很麻烦了。
-
对比非阻塞IO,主要是不用无限循环和在用户区和操作系统之间来回切换,当然上面这点在非阻塞IO下也适合,因为都是单个线程。
-
多路复用一个selector可以监听多个事件的发生,这样好处就是可以selector可以把所有的事件都获取到,一次性全部获取再一并处理(多线程下通过不同的线程),根据key(事件)的不同类型分别做不同的处理。多少个事件就处理多少次。不用等待连接,等待数据了。
-
信号驱动模型(不常用)
-
同步IO模型:线程自己去获取结果(阻塞IO,非阻塞IO,多路复用)
-
异步IO模型:线程不是自己去获取结果,而是由其他线程送结果(至少两个线程),发起请求的线程发送过去就回调方法了。线程之间的数据是通过回调方法来进行数据返回的。异步是非阻塞的,异步阻塞纯属胡扯。
-
多路复用本质上也是同步,因为是自己发送的请求,也是自己接收的信息
3. 异步IO零拷贝问题
1. 传统IO问题
传统IO将一个文件通过socket写出
File f = new File("helloworld/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");byte[] buf = new byte[(int)f.length()];
file.read(buf); //开始读取文件到Byte数组Socket socket = ...;
//socket写出,从Byte数组写出到网络
socket.getOutputStream().write(buf);
2. 内部工作流程
短短几行代码内部其实是很复杂的,内部流程如下:
-
Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java
程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用。
DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU。DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
-
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA。
-
调用write方法,这时将数据从内核缓冲区(byte[] buf)写入socket缓冲区(操作系统中), CPU会参与拷贝。
-
接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu。
-
总结上面的,一小段程序,进行了四次复制,3次切换。第一次调用file.read(buf)的时候切换到操作系统进行读取,读取完成read结束,又切换回用户缓冲区,下一次在socket.getOutputStream().write(buf)写出的时候又切换到操作系统进行写出。
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次(开始从磁盘读取到操作系统,再从操作系统读取到byte[] buf,然后又从byte[] buf读取到socket缓冲区,最后读到网卡)
3. NIO优化
通过 DirectByteBuf
- ByteBuffer.allocate(10),使用HeapByteBuffer,使用的还是java内存
- ByteBuffer.allocateDirect(10),使用DIrectByteBuffer,使用的是操作系统内存,但是有个特点就是java可以访问,操作系统也可以访问。
- DIrectByteBuffer这时候的内核缓冲区和用户缓冲区其实可以当作同一块了,这样变相减少一次数据的拷贝了
大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用
- 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
- Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
1、DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
2、通过专门线程访问引用队列,根据虚引用释放堆外内存 - 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
4. 进一步优化(linux2.1)
底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
- 数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU
可以看到
- 只发生了一次用户态和内核态的切换
- 数据拷贝了3次
5. 进一步优化(linux2.4)
- Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
- 只会将一些 offset(偏移量) 和 length(文件长度) 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU
整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次。
6. 零拷贝
所谓的零拷贝,并不是真正无拷贝,而是在不会拷贝重复数据到JVM内存,零拷贝优点:
- 更少的用户态和内核的切换
- 不利用CPU计算,减少CPU缓存伪共享
- 零拷贝适合小文件传输。大文件其实只读了一次,缓冲区的优势没发挥出来,而且也会影响其他操作的效率
注意:操作系统从磁盘读到缓冲区或读到网络设备可以调用一个DMA硬件来完成,不用利用CPU来完成,节约CPU的时间和空间。
4. AIO(异步IO)
AIO 用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows 系统通过 IOCP 实现了真正的异步 IO
- Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
**代码:**
@Slf4j
public class AioFileChannel {public static void main(String[] args) {try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {//参数1 ByteBuffer//参数2 从哪开始读 position//参数3 附件 万一一次读不完还是需要一个ByteBuffer//参数4 回调对象,另外的线程异步回调数据ByteBuffer buffer = ByteBuffer.allocate(16);log.debug("read begin...");//守护线程来执行回调函数channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Override //成功调用这个//result:读取的字节数 attachment:一次读不完再次读的ByteBuffer//这里因为我们传入的参数1和参数3都是同一个,就无所谓了public void completed(Integer result, ByteBuffer attachment) {log.debug("read completed...");attachment.flip();debugAll(attachment);}@Override //失败调用这个public void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});log.debug("read end...");//主线程等等,等守护线程返回结果System.in.read();} catch (IOException e) {}}
}