目录
阻塞IO
非阻塞IO
IO多路复用
基于epoll的IO多路复用
信号驱动
异步IO
传统的-阻塞式代码
基础的reactor模型-单reactor单线程处理 代码
多线程处理-单reactor-多线程处理
IO是输入输出(Input/Output)的缩写,是指计算机与外部世界进行数据交换的过程。
在Java中,IO流是一种用来读写数据的机制。Java的IO流分为两大类:字节流和字符流。
阻塞IO
定义:进程在进行 IO 操作时会挂起,会一直阻塞到内核缓冲区数据准备好并复制到用户缓冲区之后。
例子:老王去钓鱼,把带有鱼饵的钓竿放进水里后就做在河边一直盯着,啥也不干,等到鱼上钩才把鱼 钓上来放进桶里。期间什么也干不了。
流程解释:
1. 用户进程需要进行 IO 操作时,会进行一次系统调用,进入到内核态,此时用户进程被挂起。处于阻
塞状态。此时进程不会再占用 cpu 资源。
2. 内核进行数据的准备,把需要的数据填充到内核缓冲区。
3. 内核缓冲区数据填充完毕,把数据从内核缓冲区复制到用户缓冲区。
4. 数据复制完毕,返回,从内核态从新切换到用户态,进程进入就绪状态等待 cpu 执行
非阻塞IO
定义:用户进程在进行 IO 操作后,不会被挂起,还是会继续执行逻辑。
例子:老王去钓鱼,把带有鱼饵的钓竿放进水里后,期间可以玩手机,看微信,只是要不断地把眼睛看
向合理,看鱼有没有上钩,没有上钩就继续玩手机,有就收杆,把鱼放进桶里。
流程说明:
1. 用户进程需要进行 IO操作时,会进行一次系统调用,进入到内核态,如果数据没有准备好,立即返 回 EWOULDBLOCK 。此时不会造成进程阻塞。进程还可以继续处理其他事情。
2. 进程会轮询查看内核数据是否准备好,如果没有准备好,就继续立即返回 EWOULDBLOCK,不阻 塞进程。
3. 轮询到数据准备好了后,进行数据复制,从内核缓冲区复制到用户缓冲区。在此期间进程会挂起, 处于阻塞状态,知道数据复制完成。
4. 数据复制完毕后返回,进程转为就绪态,等待 cpu 调度。
IO多路复用
IO 多路复用有基于 select/poll 的对路复用,也有基于 epoll 的多路复用。
基于 select/poll 的多路复用:
多个网络 IO 连接可以注册到一个复路器select上,然后由一个进程或者线程调用该复路器,调用该复路 器会使得进程或者线程挂起,处于阻塞状态,内核会轮询监视该复路器上的每一个连接,一旦有一个连 接的数据准备好了,该 select会返回,然后进程或者线程退出阻塞状态,然后该进程或者线程会进行系 统调用,把内核缓冲区的数据复制到用户缓冲区。
例子:老王去河边钓鱼,不过他有点贪心,一次性使用多个鱼竿钓鱼(假设10个),然后把十个鱼竿放 进合理,然后就眼睛不断从左往右循环看每一根鱼竿是否有鱼上钩,其中一根鱼竿上钩了,就把鱼钓起 放进桶里。
流程说明:
1. 连接到服务进程上的多个套接字连接会注册到复路器 select 上。
2. 进程调用 select 系统调用。进入内核态,应用进程挂起,内核会对 select上的连接进行监视轮询, 监视连接的数据是否准备好。此间进程会处于阻塞状态。
3. 一旦 select 复用器上的一个或者多个连接数据准备好了, select会返回,然后进程会取消阻塞状态 并再发起系统调用 recvfrom把内核缓冲区的数据复制到用户缓冲区,此时进程又会被挂起,此时的 系统调用 recvfrom 时内核缓冲区数据必定是准备好的。
4. 数据复制完成返回。
基于epoll的IO多路复用
回顾基于 select/poll 的 IO 多路复用缺点:
select 的模型默认能同时接收的连接数是 1024 个,因为一个进程默认最多打开 1024 个 fd 文件描述
符。而基于 poll 模型的 IO 多路复用就没有限制。
对复路器上的连接的监视轮询是线性时间复杂度 O(n),也就是说随着连接数的增加,对复路器的监视 效率会降低。
基于 epoll 的 IO 多路复用的改进:
一个进程打开的 fd 连接文件描述符没有限制。会限制于内存大小, 1GB 内存大概可以打开 10w 个。
利用每个文件描述符 fd 上的 callback 函数来实现异步回调,省略了对复路器上的连接监视轮询的开
销。时间复杂度 O(1) ,就不会随着连接数的增多而降低。
例子:老王去河边钓鱼,不过他有点贪心,一次性使用多个鱼竿钓鱼(假设10个)(不过老王这从使用 的是升级版的鱼竿,每根鱼竿上绑着一个小铃铛,当有鱼上钩时铃铛会响),然后把十个鱼竿放进河 里,因为使用了升级版鱼竿,使用老王不用从左到右盯着鱼竿,但是也什么也干不了,只能那发呆。等 到某一根鱼竿有鱼上钩,铃铛就会响,然后老王就把那根鱼竿收杆,把鱼放进桶里。
流程:
1. 连接到服务进程上的多个套接字连接会注册到复路器上。
2. 进程调用 epoll 系统调用。进入内核态,应用进程挂起。
3. 一旦复用器上的某个连接数据准备好了,就会通过该连接套接字描述符 fd 上的回调函数通知应用进
程并,然后进程会取消阻塞状态并再发起系统调用 recvfrom 把内核缓冲区的数据复制到用户缓冲
区,此时进程又会被挂起,此时的系统调用 recvfrom 时内核缓冲区数据必定是准备好的。
4. 数据复制完成返回。
信号驱动
老王去河边钓鱼,用的升级版鱼竿,放下鱼竿后他可以干其他事,玩游戏,刷微博等,等听到铃铛响了 之后,就把鱼放到桶里。
流程说明:
1. 在 Socket 连接上安装一个信号处理函数,然后进程调用 sigaction系统调用,但是立即返回,进程不 用阻塞。继续执行。
2. 当某个 Socket 的数据准备好了之后,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用
recvfrom 进行数据的复制。复制过程中进程阻塞。
3. 复制完成返回。
异步IO
异步 IO 是最快的。
例子:老王去河边钓鱼,用的终极版鱼竿(自动放进桶里并播放响铃),放下鱼竿后他可以干其他事, 玩游戏,刷微博等,鱼上钩了以后,终极版鱼竿会自动收杆并把鱼放进桶里后,响铃会响,通知老王把 鱼煮了。
流程说明:
1. 1. 应用进程接收到 IO 连接,发起一次 aio_read 系统调用,然后立即返回,进程不阻塞,可以继
续执行。
2. 等待数据准备好。
3. 数据准备好了以后,内核直接把数据从内核缓冲区复制到用户缓冲区,而无需用户进程发起系
统调用再进行复制。
4. 数据复制完以后返回指定信号给用户进程,此时数据已经在用户的缓冲区了,所以用户进程可
以直接在用户缓冲区拿数据处理。
传统的-阻塞式代码
public class BioSocketServer {public static void main(String[] args) throws IOException {//建立socket连接,并监听端口ServerSocket serverSocket = new ServerSocket(8888);while (true) {//这里阻塞等待连接Socket client = serverSocket.accept();//当有连接建立时线程处理 只是演示 真实环境所有线程都应该通过线程池创建new Thread(new Runnable() {@Overridepublic void run() {try {//处理器handler(client);} catch (IOException e) {e.printStackTrace();}}}).start();}
}
/**
* 处理器
*/
private static void handler(Socket client) throws IOException {byte[] bytes = new byte[1024];// 第二个阻塞,获取客户端发来的数据,假如客户端没有发数据,则会一直阻塞int read = clientSocket.getInputStream().read(bytes);if (read != -1) {System.out.println( new String(bytes, 0, read));}}
}
基础的reactor模型-单reactor单线程处理 代码
public class NioSelectorSignThreadServer {public static void main(String[] args) throws IOException {// 创建Nio连接通道ServerSocketChannel serverSocket = ServerSocketChannel.open();//绑定监听端口serverSocket.socket().bind(new InetSocketAddress(8888));// 设置ServerSocketChannel为非阻塞serverSocket.configureBlocking(false);// 打开Selector处理Channel,即创建epoll 这里底层调用了系统的epoll_create方法,创建了epoll对象Selector selector = Selector.open();// 把ServerSocketChannel注册到selector上,且绑定OP_ACCEPT事件 这里底层调用了epoll_ctl将事件加入到epoll中serverSocket.register(selector, SelectionKey.OP_ACCEPT);//标注1System.out.println("nio服务启动成功");while (true) {// 这里是阻塞的 nio并非是异步非阻塞 而是同步阻塞// 阻塞等待需要处理的事件发生 这里调用了系统的epoll_wait方法,轮训就绪队列rdlist// 返回的select是此次获取的已就绪事件个数int select = selector.select();if(select <= 0){continue;}// 获取selector中注册的全部事件的就绪事件 SelectionKey 实例Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();// 遍历SelectionKey对事件进行处理while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// OP_ACCEPT事件(相对于server端,accept事件就是客户端注册事件,在客户端就是OP_CONNECT事件)// 开始进行连接处理,和读事件注册if (selectionKey.isAcceptable()) {// 注意这里的channel类型,因为OP_ACCEPT事件是ServerSocketChannel注册的(标注1),所以OP_ACCEPT事件返回的对象也是ServerSocketChannelServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();SocketChannel socketChannel = server.accept();// 这里要注意将客户端连接注册成了异步,select模式属于异步,必须要注册成异步才支持select模式socketChannel.configureBlocking(false);// 注册需要的事件 读(OP_READ)事件socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);System.out.println("客户端连接成功");// 读事件,处理客户端传过来的数据} else if (selectionKey.isReadable()) {SocketChannel socketChannel = (SocketChannel)selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);int len = socketChannel.read(byteBuffer);// 如果有数据,把数据打印出来if (len > 0) {System.out.println("接收到消息:" + newString(byteBuffer.array()));selectionKey.interestOps(SelectionKey.OP_READ);// 如果客户端断开连接,关闭Socket} else if (len == -1) {System.out.println("客户端断开连接");socketChannel.close();}//写事件 水平触发 需要移除该事件 需要时再注册} else if(selectionKey.isWritable()){SocketChannel socketChannel = (SocketChannel)selectionKey.channel();System.out.println("write事件");// NIO事件触发是水平触发selectionKey.interestOps(SelectionKey.OP_READ);}//将已处理的事件移除,防止二次处理iterator.remove();}}}
}
多线程处理-单reactor-多线程处理
将上面的处理 select 事件部分改为多线程即可
多个反应器的多线程模式-多reactor多线程处理
比如netty的parentGroup、childGroup 属于主从reactor模式
public class NettyServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup parentGroup = new NioEventLoopGroup();NioEventLoopGroup childGroup = new NioEventLoopGroup();try{ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(parentGroup,childGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throwsException {ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new FixedLengthFrameDecoder(6));pipeline.addLast(newDelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("####".getBytes())));pipeline.addLast(newStringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new ServerHandler3());
// pipeline.addLast(new ChatSocketSer)}});ChannelFuture future = bootstrap.bind(8888).sync();System.out.println("启动成功");future.channel().closeFuture().sync();}finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}
}