Netty由浅入深的学习指南(NIO基础)

news/2025/1/12 0:50:27/

本章节会重点讲解NIO的Selector、ByteBuffer和Channel三大组件。NIO即非阻塞IO。

1.1 三大组件

1.1.1 Channel(通道) & Buffer(缓冲区)

​ channel有一点类似于stream,他就是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输出,channel比stream更为底层。

在这里插入图片描述

常见的Channel:

  • FileChannel :文件
  • DatagramChannel :UDP
  • SocketChannel :socket
  • ServerSocketChannel :socket

Buffer用来缓冲读写数据,常见的Buffer:

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

1.1.2 Selector(选择器)

服务器的三种设计模式

  • 多线程版设计

    在这里插入图片描述

    缺点:

    1)内存占用高

    2)线程上下文切换成本高

    3)只适合连接数少的场景

  • 线程池版设计

在这里插入图片描述

缺点:

1)阻塞模式下,线程仅能处理一个socket连接

2)仅适合短连接场景

  • selector(选择器)版设计

    selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上,适合连接数多,流量低的场景。

在这里插入图片描述

​ 调用selectorselect()会阻塞直到channel发生读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理。

1.2 ByteBuffer

1.2.1 基本使用

  • 创建测试文件test.txt

    123456789qazxsw
    
  • FileChannel 测试

    //获取FileChannel 1.输入输出流;2.RandomAccessFile JDK提供的文本编辑工具
    try (FileChannel channel = new FileInputStream("F:\\test.txt").getChannel()) {//建立缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);int len = 0;while (true){//从 channel 读取数据,即向 buffer 写入len = channel.read(buffer);if(len == -1){break;}//打印 buffer 内容buffer.flip(); //切换到读模式while (buffer.hasRemaining()){byte b = buffer.get();logger.info("获取的内容:{}",(char)b);}buffer.clear(); //切换到写模式}
    } catch (IOException e) {e.printStackTrace();
    }
    

1.2.2 内部结构

ByteBuffer有以下重要属性

  • capacity
  • position
  • limit

开始

在这里插入图片描述

写模式下,position是写入位置,limit等于容量,下图表示写入字节后的状态

在这里插入图片描述

flip动作发生后,position切换为读取位置,limit切换为读取限制

在这里插入图片描述

读取字节后,此时的状态为:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qDGPob7n-1620869743510)(C:\Users\dell\AppData\Roaming\Typora\typora-user-images\1618900195608.png)]

clear动作发生后,状态切换为写状态

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MMg3SX3B-1620869743511)(C:\Users\dell\AppData\Roaming\Typora\typora-user-images\1618900230057.png)]

compact动作是将未读完的数据向前压缩,然后切换至写状态

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WjaEuJ5Z-1620869743511)(C:\Users\dell\AppData\Roaming\Typora\typora-user-images\1618900307844.png)]

1.2.3 常见的方法

  • 分配空间

    ByteBuffer.allocate(1024); //使用java的堆内存
    ByteBuffer.allocateDirect(1024); //使用直接内存
    
  • 写入数据

    //调用channel的read方法
    channel.read(buffer);
    //调用bytebuffer的put方法
    buffer.put((byte) 0x61); //存入单独的字节
    buffer.put(new byte[]{0x62,0x63,0x64}); //存入字节数组
    ... ...
    
  • 读取数据

    //调用channel的write方法
    channel.write(buffer);
    //调用ByteBuffer的get方法
    System.out.println(buffer.get()); //获取当前指定字节
    System.out.println(buffer.get(2)); //获取指定index字节
    ... ...
    //使用rewind方法可以使positios重新定位到0,即达到重复读取数据的效果
    buffer.rewind(); //重新读取数据
    
  • 由写模式切换到读模式

    buffer.flip(); //切换到读模式
    
  • 由读模式切换到写模式

    buffer.clear(); //清空buffer,重新开始填入
    buffer.compact(); //将未读取的数据向前压缩,在其后填入
    
  • 标记与复位

    buffer.mark();  //标记当前position
    buffer.reset(); //复位到上次标记的position
    
  • 字符串与ByteBuffer的相互转换

    //字符串转ByteBuffer
    ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
    ByteBuffer buffer2 = Charset.forName("UTF-8").encode("你好");
    ByteBuffer buffer3 = ByteBuffer.allocate(1024);
    buffer3.put("你好".getBytes());
    ByteBuffer buffer4 = ByteBuffer.wrap("你好".getBytes());
    //ByteBuffer转字符串
    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer1);
    System.out.println(charBuffer.toString());
    

1.2.4 分散读(Scattering Reads)

  • 准备一个文本文件,其内容如下

    123456789
    
  • 分散读取到多个ByteBuffer

    public static void ScatteringReads(String filePath){try (RandomAccessFile file = new RandomAccessFile(filePath,"rw")){FileChannel channel = file.getChannel();//声明多个ByteBufferByteBuffer buffer1 = ByteBuffer.allocate(10);ByteBuffer buffer2 = ByteBuffer.allocate(10);ByteBuffer buffer3 = ByteBuffer.allocate(10);//读入数据channel.read(new ByteBuffer[]{buffer1,buffer2,buffer3});//切换到写模式buffer1.flip();buffer2.flip();buffer3.flip();//读取第一个字节System.out.println((char) buffer1.get());System.out.println((char) buffer2.get());System.out.println((char) buffer3.get());} catch (Exception e) {e.printStackTrace();}
    }
    

1.2.5 集中写(Gathering Writes)

  • 集中写入文件

    public static void GatheringReads(String filePath){try (FileChannel channel = new RandomAccessFile(filePath, "rw").getChannel()) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好");ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("李焕英");channel.write(new ByteBuffer[]{buffer,buffer1});} catch (IOException e) {e.printStackTrace();}
    }
    

1.2.6 粘包半包解析

  • 原始数据说明

    • 原始数据

      Hello,word \n

      I am fyy \n

      He is a apply

    • 粘包现象(多条数据拼接到一起)

      Hello,word\nI am fyy\nHe is

    • 半包现象(完整数据被截断)

      Hello,word\nI am fyy\nHe is

      a apply

  • 解析粘包半包

    public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put("Hello,word\nI am fyy\nHe is".getBytes());split(buffer);buffer.put("a apply".getBytes());split(buffer);
    }public static void split(ByteBuffer buffer){//切换未读buffer.flip();for (int i = 0; i < buffer.limit(); i++) {if(buffer.get(i) == '\n'){int length = i + 1 - buffer.position();ByteBuffer target = ByteBuffer.allocate(length);//读取for (int j = 0; j < length; j++) {target.put(buffer.get());}}}//切换写buffer.compact(); //不能使用clear
    }
    

1.3 文件编程(FileChannel)

​ 注意:FileChannel只能工作在阻塞模式下

1.3.1 FileChannel简介

  • 获取:

    不能直接打开FileChannel,必须通过FileInputStreamFileOutputStream或者RandomAccessFile来获取FileChannel,他们都提供有getChannel()方法

    • 通过FileInputStream获取的channel只能读
    • 通过FileOutputStream获取的channel只能写
    • 通过RandomAccessFile获取的channel依据构造RandomAccessFile时的读写模式决定
      • r :只读
      • rw :读写
      • rwd :读写,写操作时同步的刷新到磁盘,刷新内容
      • rws :读写,写操作时同步的刷新到磁盘,刷新内容和元数据
    //通过FileInputStream获取channel
    FileChannel channel1 = new FileInputStream(filePath).getChannel();
    //通过FileOutputStream获取channel
    FileChannel channel2 = new FileOutputStream(filePath).getChannel();
    //通过RandomAccessFile获取channel
    FileChannel channel3 = new RandomAccessFile(filePath, "rw").getChannel();
    
  • 读取:

    从channel读取数据填充到ByteBuffer,返回值表示读到的字节,-1表示结束

    len = channel.read(buffer);
    
  • 写入:

    将一个buffer写入channel

    //通过FileOutputStream获取channel
    FileChannel channel = new FileOutputStream(filePath).getChannel();
    buffer.put("你好".getBytes()); //填入数据
    buffer.flip(); //切换到读模式
    //为了将buffer数据全部放入,使用while
    while (buffer.hasRemaining()){channel.write(buffer);
    }
    
  • 关闭:

    channel必须关闭

    • 通过调用FileInputStreamFileOutputStream或者RandomAccessFileclose方法
    • 通过try-catch的自动资源管理(ARM)
    //调用close方法
    FileChannel channel1 = null;
    try {channel1 = new FileInputStream(filePath).getChannel();
    }finally {channel1.close();
    }
    //try-catch的自动资源管理(ARM)
    try (FileChannel channel = new FileInputStream(filePath).getChannel()) {}
    
  • 位置:

    获取当前位置

    long position = channel.position();
    

    设置位置

    long newPositios = 100L;
    channel.position(newPositios);
    

    设置当前位置时,如果设置为文件的末尾

    • 读取会返回-1,既没有数据
    • 写入会追加内容,如果position超出文件末尾,中间会存在空洞(00)

1.3.2 两个channel数据传输

try (FileChannel readChannel = new FileInputStream(readPath).getChannel();FileChannel writeChannel = new FileOutputStream(writePath).getChannel()) {//效率高,底层使用零拷贝技术,最大支持2G文件传输readChannel.transferTo(0,readChannel.size(),writeChannel);//大于2G的文件可以多次传输long size = readChannel.size();for (long left = size; left > 0; ) {left -= readChannel.transferTo((size -left),left,writeChannel);}
}catch (Exception e){e.printStackTrace();
}

1.3.3 Path

JDK7开始引入了Path和Paths类

  • Path 用来表示文件路径

  • Paths时工具类,用来获取Path实例

    Path path = Paths.get("F:\\test.txt");
    
    • .:当前路径

    • ..:上一级路径

      Path path = Paths.get("F:\\test.txt\\..\\test2.txt");
      

1.3.4 Files

  • 检查文件是否存在

    Path path = Paths.get("F:\\test.txt");
    System.out.println(Files.exists(path));
    
  • 创建目录

    //创建一级目录,存在:FileAlreadyExistsException;多级:NoSuchFileException
    Path path1 = Paths.get("F:\\test2");
    System.out.println(Files.createDirectory(path1));//创建多级目录
    Path path2 = Paths.get("F:\\test2\\eee\\222");
    System.out.println(Files.createDirectories(path2));
    
  • 文件拷贝

    //文件拷贝,文件存在:FileAlreadyExistsException
    Path source = Paths.get("F:\\test.txt");
    Path target = Paths.get("F:\\test2.txt");
    Files.copy(source,target);
    //覆盖拷贝
    Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);
    
  • 文件移动

    //文件移动ATOMIC_MOVE保证文件移动的原子性
    Files.move(source,target,StandardCopyOption.ATOMIC_MOVE);
    
  • 删除文件

    //文件删除
    Files.delete(source); //文件不存在:NoSuchFileException
    Files.deleteIfExists(source);
    
  • 删除目录

    //删除目录:存在内容:DirectoryNotEmptyException
    Files.delete(target);
    //使用walkFileTree
    Files.walkFileTree(Paths.get("F:\\test2"),new SimpleFileVisitor<Path>(){//进入文件前@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.deleteIfExists(file); //删除文件return super.visitFile(file, attrs);}//推出文件夹时@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.deleteIfExists(dir); //删除文件夹return super.postVisitDirectory(dir, exc);}
    });
    
  • 文件遍历

    AtomicInteger dirCount = new AtomicInteger();
    AtomicInteger fileCount = new AtomicInteger();
    Files.walkFileTree(Paths.get("F:\\"),new SimpleFileVisitor<Path>(){//进入目录前@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}//进入文件前@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {fileCount.incrementAndGet();return super.visitFile(file, attrs);}
    });
    
  • 多级目录拷贝

    String sourcePath = "F:\\test";
    String targetPath = "F:\\test2";
    Files.walk(Paths.get(sourcePath)).forEach(path3 -> {try {String targetName = path3.toString().replace(sourcePath,targetPath);if(Files.isDirectory(path3)){//创建拷贝目录Files.createDirectory(Paths.get(targetName));}if(Files.isRegularFile(path3)){//拷贝文件Files.copy(path3,Paths.get(targetName));}}catch (Exception e){e.printStackTrace();}
    });
    

1.4 网络编程

1.4.1 非阻塞 vs 阻塞

  • 阻塞

    • 在没有数据可读时,包括数据复制过程中,线程必须阻塞等待,不会占用CPU,但线程相当于闲置
    • 为了减少线程数,可以采用线程池技术
    • 即使存在线程池,如果建立的连接过多,长时间的inactive,会阻塞线程池中的所有线程

    演示代码

    //服务器:
    //1.声明全局的ByteBuffer用于处理客户端数据
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //2.声明服务器通道,等待客户端接入
    ServerSocketChannel ssc = ServerSocketChannel.open();
    //3.绑定端口
    ssc.bind(new InetSocketAddress("127.0.0.1",8080));
    //4.声明一个集合,保存连接的客户端
    List<SocketChannel> socketChannels = new ArrayList<>();
    while (true){//5.阻塞SocketChannel socketChannel = ssc.accept(); //阻塞线程,等待客户端接入socketChannels.add(socketChannel); //保存新接入的客户端for (SocketChannel sc : socketChannels) {sc.read(buffer); //阻塞线程,等待客户端数据buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();}
    }
    //客户端:
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("127.0.0.1",8080));
    while (true){sc.write(StandardCharsets.UTF_8.encode("123456"));Thread.sleep(5000);
    }
    
  • 非阻塞

    • 在channel没有可读事件时,线程不必阻塞,它可以去处理其它有可读事件的channel
    • 数据的复制过程中,线程实际还是阻塞的
    • 写数据时,线程只是等待数据写入channel即刻,无需等channel通过网络发送数据出去

    演示代码

    //服务器:
    //1.声明全局的ByteBuffer用于处理客户端数据
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //2.声明服务器通道,等待客户端接入
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false); //改为非阻塞模式
    //3.绑定端口
    ssc.bind(new InetSocketAddress("127.0.0.1",8080));
    //4.声明一个集合,保存连接的客户端
    List<SocketChannel> socketChannels = new ArrayList<>();
    while (true){//5.阻塞SocketChannel socketChannel = ssc.accept(); //非阻塞线程,等待客户端接入if(socketChannel != null){socketChannels.add(socketChannel); //保存新接入的客户端socketChannel.configureBlocking(false); //改为非阻塞模式}for (SocketChannel sc : socketChannels) {int read = sc.read(buffer);//非阻塞,等待客户端数据if(read > 0){log.info("客户端:{}",sc);buffer.flip();log.info("客户端信息:{}",StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}
    }
    //客户端:
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("127.0.0.1",8080));
    while (true){sc.write(StandardCharsets.UTF_8.encode("123456"));Thread.sleep(5000);
    }
    
  • 多路复用

    线程配合Selector完成对多个channel可读事件的监控,称之为多路复用

    • 仅针对网络IO
    • Selector保证了有可读事件才去读取
    • channel输入的数据一旦准备好,会触发Selector的可读事件

    演示代码

    //服务器
    //声明Selector,管理多个Channel
    Selector selector = Selector.open();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false); //设置为非阻塞模式
    //注册到selector,并获得监听的key
    SelectionKey sscKey = ssc.register(selector, 0, null);
    sscKey.interestOps(SelectionKey.OP_ACCEPT); //注册感兴趣的事件
    ssc.bind(new InetSocketAddress("127.0.0.1",8080));
    while (true){//关注事件int select = selector.select(); //无事件时阻塞//处理事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();//key.cancel(); //取消处理,避免死循环if(key.isAcceptable()){ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false); //设置为非阻塞模式SelectionKey scKey = sc.register(selector, 0, null); //注册到selector,并获得监听的keyscKey.interestOps(SelectionKey.OP_READ); //注册感兴趣的事件}if(key.isReadable()){try {SocketChannel channel = (SocketChannel)key.channel();int read = channel.read(buffer); //正常断开返回-1if(read == -1){key.cancel(); //取消这个key的所有监听事件}else{buffer.flip();log.info("数据内容:{}",StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}catch (Exception e){key.cancel(); //取消这个key的所有监听事件}}iter.remove(); //处理完删除key,否则下次会处理到已处理的key}
    }
    

1.4.2 消息边界

一般情况下会存在三种边界问题:

  • 消息过长,超出bytebuffer容量

  • 半包现象,消息长度大于bytebuffer一半容量但未超出

  • 粘包现象,消息长度过于短
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-O712Vve1-1620869743512)(C:\Users\dell\AppData\Roaming\Typora\typora-user-images\1618997214342.png)]

解决思路:

  • 固定消息长度,服务器于客户端约定长度,都严格遵守,缺点是浪费带宽
  • 约定分隔符,按分隔符拆分,缺点是效率低
  • TLV格式:即Type(类型)-Length(长度)-Value(数据),这样类型、长度已知,就可以合理的分配buffer,缺点是buffer需要提前分配,如内容过大会影响server的吞吐量
    • Http1.1 是TLV格式
    • Http1.2 是LTV格式
//拆分分隔符
public static void selectorServer() throws IOException {//声明Selector,管理多个ChannelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); //设置为非阻塞模式//注册到selector,并获得监听的keySelectionKey sscKey = ssc.register(selector, 0, null);sscKey.interestOps(SelectionKey.OP_ACCEPT); //注册感兴趣的事件ssc.bind(new InetSocketAddress("127.0.0.1",8080));while (true){//关注事件int select = selector.select(); //无事件时阻塞//处理事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();//key.cancel(); //取消处理,避免死循环if(key.isAcceptable()){ServerSocketChannel channel = (ServerSocketChannel)key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false); //设置为非阻塞模式ByteBuffer buffer = ByteBuffer.allocate(10); //注册附件SelectionKey scKey = sc.register(selector, 0, buffer); //注册到selector,并获得监听的keyscKey.interestOps(SelectionKey.OP_READ); //注册感兴趣的事件}if(key.isReadable()){try {SocketChannel channel = (SocketChannel)key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment(); //获取key关联的附件int read = channel.read(buffer); //正常断开返回-1if(read == -1){key.cancel(); //取消这个key的所有监听事件}else{split(buffer);if(buffer.position() == buffer.limit()){ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); //扩容//复制拷贝buffer.flip();newBuffer.put(buffer);//替换附件key.attach(newBuffer);}}}catch (Exception e){key.cancel(); //取消这个key的所有监听事件}}iter.remove(); //处理完删除key,否则下次会处理到已处理的key}}
}public static void split(ByteBuffer buffer){//切换未读buffer.flip();for (int i = 0; i < buffer.limit(); i++) {if(buffer.get(i) == '\n'){int length = i - buffer.position();ByteBuffer target = ByteBuffer.allocate(length);//读取for (int j = 0; j < length; j++) {target.put(buffer.get());}buffer.get(); //读取分隔符,丢掉target.flip();System.out.println(StandardCharsets.UTF_8.decode(target));target.clear();}}//切换写buffer.compact(); //不能使用clear
}

1.4.2 处理可写事件

//服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while (true){selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey scKey = sc.register(selector, 0, null);scKey.interestOps(SelectionKey.OP_READ);//准备数据StringBuilder sb = new StringBuilder();for (int i = 0; i < 3000000; i++) {sb.append("a");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());//写入客户端int writeSize = sc.write(buffer);System.out.println("实际写入:" + writeSize);if (buffer.hasRemaining()) {scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);scKey.attach(buffer);}}if(key.isWritable()){SocketChannel sc = (SocketChannel)key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();int writeSize = sc.write(buffer);System.out.println("实际写入:" + writeSize);//清理附件if(!buffer.hasRemaining()){key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}
}
//客户端
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));while (true){ByteBuffer buffer = ByteBuffer.allocate(1024);sc.read(buffer);buffer.flip();System.out.println(StandardCharsets.UTF_8.decode(buffer));buffer.clear();
}

1.4.3 优化

//基于多线程实现服务器
public static void main(String[] args) throws IOException {//指定主线程名称Thread.currentThread().setName("BOSS"); //负责处理客户端连接事件ServerSocketChannel ssc = ServerSocketChannel.open(); //创建服务器连接ssc.configureBlocking(false); //设置为非阻塞模式Selector boss = Selector.open(); //声明选择器SelectionKey bossKey = ssc.register(boss, 0, null); //注册选择器bossKey.interestOps(SelectionKey.OP_ACCEPT); //关注连接事件ssc.bind(new InetSocketAddress(8080)); //绑定端口//创建多个Worker,并初始化Worker[] workers = new Worker[5];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-"+i);}AtomicInteger index = new AtomicInteger();while (true){boss.select();Iterator<SelectionKey> keys = boss.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next(); //获取事件的keykeys.remove(); //删除事件,防止重复处理if(key.isAcceptable()){SocketChannel sc = ssc.accept();log.info("创建连接{}",sc.getRemoteAddress());sc.configureBlocking(false); //设置为非阻塞模式//关联workers[index.getAndIncrement() % workers.length].register(sc); //worker初始化}}}
}static class Worker implements Runnable{private String name; //线程名称private Selector selector; //Selectorprivate Thread thread; //工作线程private volatile boolean start = false; //是否已创建线程及Selectorpublic Worker(String name) {this.name = name;}public void register(SocketChannel sc) throws IOException {if (!start) {selector = Selector.open();thread = new Thread(this,name);thread.start(); //启动线程start = true;}selector.wakeup(); //唤醒sc.register(selector,SelectionKey.OP_READ,null);}//执行读写事件@Overridepublic void run() {while (true){try {selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);sc.read(buffer);buffer.flip();log.info("内容" + StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}} catch (IOException e) {log.error("处理读写事件异常:{}",e.getMessage());}}}
}

1.4.3 UDP

UDP是无连接的,客户端发送数据不会管服务器是否启动,服务器的receive方法会将接收的数据存入bytebuffer,但是如果数据报文超过buffer大小,多出来的数据会被抛弃。

  • 服务器的实现
try (DatagramChannel dc = DatagramChannel.open()) {dc.socket().bind(new InetSocketAddress(8080));dc.configureBlocking(false);Selector selector = Selector.open();SelectionKey dcKey = dc.register(selector, 0, null);dcKey.interestOps(SelectionKey.OP_READ);while (true){selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator();while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();if(key.isReadable()){ByteBuffer buffer = ByteBuffer.allocate(1024);dc.receive(buffer);buffer.flip();System.out.println(StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}}
} catch (IOException e) {System.out.println("服务器异常:" + e.getMessage());
}
  • 客户端实现
while (true){try (DatagramChannel dc = DatagramChannel.open()) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好,李焕英");InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);dc.send(buffer,address);} catch (Exception e) {System.out.println("客户端异常:" + e.getMessage());}Thread.sleep(1000);
}

http://www.ppmy.cn/news/246493.html

相关文章

NIO基础,帮助入门Netty

NIO基础 1、三大组件1.1 Channel & Buffer1.2 Selector 2.ByteBuffer2.1ByteBuffer正确使用2.2 ByteBuffer结构2.3 ByteBuffer 常用方法调试工具类分配空间向buffer中写入数据从buffer中读取数据代码演示get(),get(int i),allocate(),mark(),reset()方法的使用字符串和Byte…

Day76-Netty

title: Day76-Netty date: 2021-07-23 18:03:30 author: Liu_zimo NIO基础 non-blocking io 非阻塞io 三大组件 通道&#xff08;Channel&#xff09;、缓冲区&#xff08;Buffer&#xff09;、选择器&#xff08;Selector&#xff09; Channel & Buffer channel有点类…

NIO多路复用之Selector的使用

Selector的使用 文章目录 Selector的使用一、阻塞 & 非阻塞1. 阻塞2. 非阻塞 二、selector 介绍及常用API1. 多路复用2. 常用API 三、处理 accept 事件四、处理 read 事件1. 为什么事件必须删除2. 处理客户端断开问题2.1 客户端强制断开2.2 客户端正常断开 3. 处理消息边界…

NIO网络编程

一、阻塞模式 服务端 public class server {public static void main(String[] args) throws IOException {//用 nio 来理解阻塞模式单线程//ByteBufferByteBuffer buffer ByteBuffer.allocate(16);//1.创建服务器ServerSocketChannel ssc ServerSocketChannel.open();//2.…

NIO的理解和使用

一、概述 NIO是 non-blocking-io的简称&#xff0c;非阻塞IO&#xff0c;由于它是后续出来的IO模型&#xff0c;有时也叫做 new-IO。NIO是后续比如React等的多路复用的基础模型。它是UNIX的五种IO模型中的一种。 NIO有三大组件&#xff1a;buffer、channel和selector&#xff…

NIO基础笔记

Netty深入浅出笔记 1. NIO基础1.1 三大组件1.1.1 Channel & Buffer1.1.2 Selector 1.2 ByteBuffer1.2.1 ByteBuffer 正确使用姿势1.2.2 ByteBuffer结构1.2.3 ByteBuffer核心属性1.2.4 ByteBuffer常见方法1.2.5 字符串与 ByteBuffer 互转1.2.6 Scattering Reads(分散读取)1.…

Netty学习(一)-- Netty 底层 Java NIO

视频地址&#xff0c;建议观看&#xff0c;老师由浅入深&#xff0c;循序渐进&#xff1a; https://www.bilibili.com/video/BV1py4y1E7oA 前面的学习&#xff1a;https://blog.csdn.net/weixin_43989102/article/details/126078132 目录 1、NIO1.1、Selector1&#xff09;多线…

Netty01——NIO 基础

目录 1.三大组件1.1.Channel & Buffer1.2.Selector1.2.1.多线程版设计1.2.2.线程池版设计1.2.3.selector 版设计 2.ByteBuffer2.1.ByteBuffer 使用方式2.2.ByteBuffer 结构2.2.1.ByteBuffer 的属性2.2.2.调试工具类2.2.3.使用调试工具类 2.3.ByteBuffer 常见方法2.3.1.分配…