Day76-Netty

news/2025/1/12 0:44:52/

title: Day76-Netty
date: 2021-07-23 18:03:30
author: Liu_zimo


NIO基础

non-blocking io 非阻塞io

三大组件

通道(Channel)、缓冲区(Buffer)、选择器(Selector)

Channel & Buffer

channel有点类似stream,它就是读写数据的双向通道,可以从channel将数据读入buffer,可以将buffer的数据写入channel,而之前的stream要么是输入,要么是输出,channel比stream更底层

channel
buffer

常见的Channel有:

  • FileChannel:文件数据传输管道
  • DatagramChannel:UDP网络数据传输管道
  • SocketChannel:TCP数据传输管道(客户端服务器都能用)
  • ServiceSocketChannel:TCP数据传输管道(专用服务器)

Buffer则用来缓存读写数据,常见的Buffer有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Selector

  1. 多线程版设计
graph1[thread] --> 2[socket1]
3[thread] --> 4[socket2]
5[thread] --> 6[socket3]
  • 多线程版缺点

    • 内存占用高

    • 线程上下文切换成本高

      只适合连接少数的场景


  1. 线程池版设计
graph1[thread] --> 2[socket1]
1 --> 3[socket2]
4[thread] --> 5[socket3]
4 --> 6[socket4]
  • 线程池版缺点
    • 阻塞模式下,线程仅能处理一个socket连接
    • 仅适合短连接场景

  1. selector版设计

selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景(low traffic)

graph
1[thread] --> 2[selector]
2 --> 3[channel]
2 --> 4[channel]
2 --> 5[channel]

调用selector的 select()会阻塞直到channel 发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理

ByteBuffer

有一普通文本文件data.txt,内容为:1234567890abcd

使用FileChannel来读取文件内容

package com.zimo.netty;
import lombok.extern.slf4j.Slf4j;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/*** 测试类* @author Liu_zimo* @version v1.0 by 2021/7/28 11:00:39*/
@Slf4j
public class TestByteBuffer {public static void main(String[] args) {// FileChannel// 1.输入输出流 2.RandomAccessFiletry (FileChannel channel = new FileInputStream("data.txt").getChannel()) {// 准备缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);while (true){// 从channel读取数据,向buffer写int read = channel.read(buffer);log.debug("读取到的子节{}", read);if (read == -1) break;  // 没有内容了buffer.flip();  // 切换到读模式// 打印buffer的内容while (buffer.hasRemaining()){  // 是否还有剩余未读数据byte b = buffer.get();log.debug("实际子节{}", (char)b);}buffer.clear(); // 切换为写模式}} catch (IOException e) {e.printStackTrace();}}
}

ByteBuffer正确使用姿势

  1. 向buffer写入数据,例如调用channel.read(buffer)
  2. 调用flip()切换读模式
  3. 从buffer读取数据,例如调用buffer.get()
  4. 调用clear()或者compact()切换至写模式
  5. 重复1-4步骤

ByteBuffer结构

ByteBuffer有以下重要属性

  • capacity:容量
  • position:读写指针
  • limit:读写限制

ByteBuffer结构

ByteBuffer常见方法

  • 分配空间:可以使用allocate方法为ByteBuffer 分配空间,其它buffer类也有该方法
    • Java堆内存(java.nio.HeapByteBuffer)ByteBuffer buf = ByteBuffer.allocate(16)(读写效率低,受到GC的影响2)
    • 直接内存(java.nio.DirectByteBuffer)ByteBuffer buf = ByteBuffer.allocateDirect(16)(读写效率高,少一次拷贝,不会受GC影响)
  • 向buffer写入数据
    • 调用channel的read方法
      • int readByte = channel.read(buf);
    • 调用buffer自己的put方法
      • buf.put((byte)127)
  • 从buffer读取数据
    • 调用channel的write方法int writeBytes = channel.write(buf)
    • 调用buffer自己的get方法byte b = buf.get()
    • get方法会让position读指针向后走,如果想重复读取数据
      • 可以调用rewind方法将position重新置为0
      • 或者调用get(int index)方法获取索引index的内容,它不会移动读指针
  • mark & reset
    • mark:标记当前Position位置
    • reset:跳转到mark标记的位置
字符串与ByteBuffer互转
package com.zimo.netty;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/*** ByteBuffer与String互转* @author Liu_zimo* @version v1.0 by 2021/7/28 15:23:49*/
public class TestStringExchangeByteBuffer {public static void main(String[] args) {stringToByteBuffer();   // java.nio.HeapByteBuffer[pos=5 lim=16 cap=16]charsetToByteBuffer();  // java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]warpToByteBuffer();     // java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]byteBufferToString();   // hello}// 1.String.getBytes()public static void stringToByteBuffer(){// 字符串转为ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 写模式buffer.put("hello".getBytes());System.out.println(buffer);}// 2.Charsetpublic static void charsetToByteBuffer(){// 自动转换为读模式ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");System.out.println(buffer);}// 3.wrappublic static void warpToByteBuffer(){// 自动转换为读模式ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());System.out.println(buffer);}public static void byteBufferToString(){String s = StandardCharsets.UTF_8.decode(StandardCharsets.UTF_8.encode("hello")).toString();System.out.println(s);}
}

Scattering Reads

分散读取,有一个文本文件3parts.txt

public static void main(String[] args) {    try (FileChannel channel = new RandomAccessFile("data.txt", "r").getChannel()) {ByteBuffer buffer = ByteBuffer.allocate(4);ByteBuffer buffer1 = ByteBuffer.allocate(5);ByteBuffer buffer2 = ByteBuffer.allocate(6);channel.read(new ByteBuffer[]{buffer,buffer1,buffer2});buffer.flip();buffer1.flip();buffer2.flip();while (buffer.hasRemaining()){System.out.println((char) buffer.get());}while (buffer1.hasRemaining()){System.out.println((char)buffer1.get());}while (buffer2.hasRemaining()){System.out.println((char)buffer2.get());}} catch (IOException e) {e.printStackTrace();}
}

GetheringWrites

集中写入到文本test.txt

public static void main(String[] args) {ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello ");ByteBuffer buffer1 = ByteBuffer.wrap("zimo".getBytes());ByteBuffer buffer2 = ByteBuffer.wrap("!".getBytes());try (FileChannel channel = new RandomAccessFile("test.txt", "rw").getChannel()) {channel.write(new ByteBuffer[]{buffer,buffer1,buffer2});} catch (IOException e) {}
}

网络上有多条数据发送给服务端,数据之间使用\n进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为:

Hello,word\nI'm zhagnsan\nHow are you?\n

变成下面两个byteBuffer(黏包,半包)

Hello,word\nI'm zhagnsan\nHow are you?\n
  • 需求:将错乱的数据恢复成原始的按\n分隔的数据
package com.zimo.netty;import java.nio.ByteBuffer;/** * 恢复错乱的数据 * * @author Liu_zimo * @version v1.0 by 2021/7/28 16:06:01 */public class TestByteBufferExam {    public static void main(String[] args) {        ByteBuffer buffer = ByteBuffer.allocate(32);        buffer.put("Hello,word\nI'm zhagnsan\nHo".getBytes());        split(buffer);        buffer.put("w are you?\n".getBytes());        split(buffer);    }    private static void split(ByteBuffer buffer) {        buffer.flip();        for (int i = 0; i < buffer.limit(); i++) {            // 找到一条完整的消息            if (buffer.get(i) == '\n') {                int lenth = i + 1 - buffer.position();                // 把这条完整消息存入新的bytebuffer                ByteBuffer target = ByteBuffer.allocate(lenth);                // 从buffer读,写入target                for (int j = 0; j < lenth; j++) {                    target.put(buffer.get());                }                target.flip();                while (target.hasRemaining()){                    System.out.print((char) target.get());                }            }        }        buffer.compact();    }}

文件编程

FileChannel

  • **注意:**FileChannel只能工作在阻塞模式下

获取

不能直接打开FileChannel,必须通过FileInputStream、FileOutputStream或者RandomAccessFile 来获取FileChannel,它们都有getChannel方法

  • 通过FilelnputStream获取的channel只能读
  • 通过FileOutputStream获取的channel只能写
  • 通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定

读取

会从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾

int readBytes = channel.read(buffer);

写入

ByteBuffer buffer = ...;buffer.put(...);	// 存入数据buffer.flip();		// 切换读模式while(buffer.hasRemaining()){    channel.write(buffer);}
  • 在while中调用channel.write是因为write方法并不能保证一次将buffer中的内容全部写入channel

关闭

channel必须关闭,不过调用了FileInputStream、FileOutputStream或者RandomAccessFile的close方法会间接地调用channel的close方法

位置

  • 获取当前位置:long pos = channel.position();
  • 设置当前位置:channel.position(long newPos);
  • 设置当前位置时,如果设置为文件的末尾
    • 这是读取返回-1
    • 这时写入,会追加内容,但是要注意如果position超过文件末尾,再写入时在新内容和原末尾直接会有空洞(00)

大小

使用size方法获取文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用force(true)方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

两个Channel传输数据

public static void main(String[] args) {        try (                FileChannel from = new FileInputStream("data.txt").getChannel();                FileChannel to = new FileOutputStream("to.txt").getChannel();        ) {            // 效率高,底层利用操作系统的零拷贝进行优化,传输上限2G            long size = from.size();            // left:剩余多少子节,传输大于2G的数据            for (long left = size; left > 0;) {                left -= from.transferTo(size-left, left, to);            }        } catch (IOException e) {            e.printStackTrace();        }    }

Path

jdk7引入了Path和 Paths类

  • Path用来表示文件路径
  • Paths是工具类,用来获取Path实例
// 相对路径 使用user.dir	环境变量来定位1.txtPath path = Paths.get("1.txt");// 绝对路径 代表了 d:\1.txtPath path = Paths.get("d:\\1.txt");// 绝对路径 代表了 d:/1.txtPath path = Paths.get("d:/1.txt");// 代表了 d:\data\projectPath path = Paths.get("d:\data","project");
  • 当前路径:.
  • 上级路径:..

Files

  • 检查文件是否存在:File.exists(Paths.get("hello/data.txt"));
  • 创建一级目录:Files.createDirectory(Paths.get("hello/aa"))
    • 如果目录已存在,会抛异常FileAlreadyExistsException
    • 不能一次创建多级目录,否则会抛异常NoSuchFileException
  • 创建多级目录:Files.createDirectory(Paths.get("hello/aa/bb/cc"))
  • 拷贝文件:Files.copy(Paths.get("aa/src.txt"), Paths.get("aa/dest.txt"))
    • 如果目录已存在,会抛异常FileAlreadyExistsException
    • 如果希望用source覆盖掉target,需要用StandardCopyOption来控制
      • Files.copy(Paths.get("aa/src.txt"), Paths.get("aa/dest.txt"), StandardCopyOption.REPLACE_EXISTING);
  • 移动文件:Files.move(Paths.get("aa/src.txt"), Paths.get("aa/dest.txt"), StandardCopyOption.ATOMIC_MOVE);
    • StandardCopyOption.ATOMIC_MOVE保证文件移动的原子性
  • 删除文件:Files.delete(Paths.get("aa/target.txt"))
    • 如果文件不存在,会抛异常NoSuchFileException
  • 删除目录:Files.delete(Paths.get("aa/bb"))
    • 如果目录还有内容,会抛异常DirectoryNotEmptyException
  • 遍历目录文件:
package com.zimo.netty;import java.io.File;import java.io.IOException;import java.nio.file.*;import java.nio.file.attribute.BasicFileAttributes;import java.util.concurrent.atomic.AtomicInteger;/** * 遍历文件树 * @author Liu_zimo * @version v1.0 by 2021/7/28 17:33:51 */public class TestFilesWalkFileTree {    public static void main(String[] args) throws IOException {        AtomicInteger dirCount = new AtomicInteger();        AtomicInteger fileCount = new AtomicInteger();        Files.walkFileTree(Paths.get("F:/a"), new SimpleFileVisitor<Path>(){            @Override            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {                System.out.println("****" + dir);                dirCount.incrementAndGet();                return super.preVisitDirectory(dir, attrs);            }            @Override            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {                System.out.println(file);                fileCount.incrementAndGet();                return super.visitFile(file, attrs);            }        });        System.out.println("dir " + dirCount);        System.out.println("file " + fileCount);        // 删除多级路径文件        Files.walkFileTree(Paths.get("F:/a"), new SimpleFileVisitor<Path>(){            @Override            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {                Files.delete(file);                return super.visitFile(file, attrs);            }            @Override            public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {                Files.delete(dir);                return super.postVisitDirectory(dir, exc);            }        });    }}
  • 拷贝多文件目录
package com.zimo.netty;import java.io.IOException;import java.nio.file.Files;import java.nio.file.Paths;/** * 多级文件拷贝 * @author Liu_zimo * @version v1.0 by 2021/7/28 17:52:17 */public class TestFileCopy {    public static void main(String[] args) throws IOException {        String src = "F:\\a";        String dest = "F:\\b";        Files.walk(Paths.get(src)).forEach(path -> {            try {                String replace = path.toString().replace(src, dest);                if (Files.isDirectory(path)) {                    // 目录                    Files.createDirectory(Paths.get(replace) );                }else if (Files.isRegularFile(path)){                    Files.copy(path, Paths.get(replace));                }            } catch (IOException e) {                e.printStackTrace();            }        });    }}

网络编程

阻塞VS非阻塞

  • 阻塞

    • 在没有数据可读时,包括数据复制过程中,线程必须阻塞等待,不会占用cpu,但线程相当于闲置
    • 32位jvm一个线程320k,64位jvm一个线程1024k,为了减少线程数,需要采用线程池技术
    • 但即便用了线程池,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有线程
  • 非阻塞

    • 在某个Channel没有可读事件时,线程不必阻塞,它可以去处理其它有可读事件的Channel
    • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)
    • 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
  • 客户端

public class Client {    public static void main(String[] args) throws IOException {        SocketChannel sc = SocketChannel.open();        sc.connect(new InetSocketAddress("127.0.0.1", 10086));    }}
  • 阻塞 - 服务端
public class Server {    public static void main(String[] args) throws IOException {        // 使用nio来理解阻塞模式        ByteBuffer buffer = ByteBuffer.allocate(16);        // 1.创建了服务器        ServerSocketChannel ssc = ServerSocketChannel.open();        // 2.绑定监听端口        ssc.bind(new InetSocketAddress(10086));        // 3.连接集合        List<SocketChannel> channels = new ArrayList<>();        while (true){            // 4.accent 建立与客户端连接,SocketChannel用来与客户端之间通信            SocketChannel sc = ssc.accept();    // 阻塞等待连接            channels.add(sc);            // 5.接收客户端发送的数据            for (SocketChannel channel : channels) {                channel.read(buffer);   // 阻塞方法                buffer.flip();                while (buffer.hasRemaining()) {                    System.out.print((char) buffer.get());                }                buffer.clear();            }        }    }}
  • 非阻塞 - 服务端
public static void main(String[] args) throws IOException {    // 使用nio来理解阻塞模式    ByteBuffer buffer = ByteBuffer.allocate(16);    // 1.创建了服务器    ServerSocketChannel ssc = ServerSocketChannel.open();    ssc.configureBlocking(false);   // 设置为非阻塞模式,主要影响accept    // 2.绑定监听端口    ssc.bind(new InetSocketAddress(10086));    // 3.连接集合    List<SocketChannel> channels = new ArrayList<>();    while (true){        // 4.accent 建立与客户端连接,SocketChannel用来与客户端之间通信        SocketChannel sc = ssc.accept();    // 非阻塞,线程继续往下执行,如果没有连接为null        if (sc != null){            sc.configureBlocking(false);    // 非阻塞,主要影响read            channels.add(sc);            // 5.接收客户端发送的数据            for (SocketChannel channel : channels) {                int read = channel.read(buffer);// 非阻塞                if (read <= 0) {                    continue;                }                buffer.flip();                while (buffer.hasRemaining()) {                    System.out.print((char) buffer.get());                }                buffer.clear();            }        }    }}
  • Selector - 服务端
    • 事件触发
      • accept:有连接时触发
      • connect:客户端事件,在建立连接后触发
      • read:可读事件
      • write:可写事件
public static void main(String[] args) throws IOException {    // 1.创建selector,管理多个channel    Selector selector = Selector.open();    ServerSocketChannel ssc = ServerSocketChannel.open();    ssc.configureBlocking(false);    // 2.建立selector和channel的联系(注册),SelectionKey事件发生后,通过他可以知道是哪个channel的事件    SelectionKey sscKey = ssc.register(selector, 0, null);    // key只关注accept事件    sscKey.interestOps(SelectionKey.OP_ACCEPT);    ssc.bind(new InetSocketAddress(10086));    while(true){        // 3.selector方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行        selector.select();  // select在未处理时,不会阻塞,要么处理要么取消        // 4.处理事件,SelectionKey内部包含了所有发生的事件        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();        while (iterator.hasNext()){            SelectionKey key = iterator.next();            // 处理完事件,要移除事件,否则下次再处理key找不到对应事件,会有问题            iterator.remove();            // 5.区分事件类型            if (key.isAcceptable()){    // accept事件                ServerSocketChannel channel = (ServerSocketChannel) key.channel();                SocketChannel sc = channel.accept();                sc.configureBlocking(false);    // 设置非阻塞                SelectionKey scKey = sc.register(selector, 0, null);                scKey.interestOps(SelectionKey.OP_READ);            }else if (key.isReadable()){    // read事件                try {                    SocketChannel channel = (SocketChannel) key.channel();                    ByteBuffer buffer = ByteBuffer.allocate(16);                    int read = channel.read(buffer);    // -1正常断开                    if (read == -1){                        key.cancel();                    }else {                        buffer.flip();                        System.out.print(Charset.defaultCharset().decode(buffer));                    }                } catch (IOException e) {                    e.printStackTrace();                    key.cancel();   // 客户端断开,因此需要将key取消。反注册                }            }        }    }}
  • 处理read事件时,要移除处理完的事件
  • 若客户端断开时read返回-1,强制断开时抛出异常,要取消掉事件

处理消息边界

  1. 固定消息长度,数据包大小一样,服务器按照预定长度读取,缺点是浪费带宽
  2. 按分隔符拆分,缺点是效率低
  3. TLV格式,即Type类型,Length长度,Value数据,类型和长度知道的情况下,就可以合理分配buffer,缺点是buffer需要提前分配,如果内容过大,则影响server吞吐量

处理消息边界问题

ByteBuffer大小分配

  • 每个channel都需要记录可能被切分的消息,因为ByteBuffer不是线程安全的,因此需要为每个channel维护—个独立的ByteBuffer
  • ByteBuffer不能太大,比如一个ByteBuffer 1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的 ByteBuffer
    • 一种思路是首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的 buffer,将4kbuffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现
    • 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

多线程优化

现在都是多核cpu,设计时要充分考虑别让cpu的力量被白白浪费

分两组选择器

  • 单线程配一个选择器,专门处理accept事件
  • 创建cpu核心数的线程,每个线程配一个选择器,轮流处理read事件
package com.zimo.netty.server;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.util.Iterator;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.atomic.AtomicInteger;/** * 多线程 * * @author Liu_zimo * @version v1.0 by 2021/7/28 18:04:34 */@Slf4jpublic class ThreadsServer {    public static void main(String[] args) throws IOException {        Thread.currentThread().setName("boss");        ServerSocketChannel ssc = ServerSocketChannel.open();        ssc.configureBlocking(false);   // 设置为非阻塞模式,主要影响accept        Selector boss = Selector.open();        SelectionKey bossKey = ssc.register(boss, 0, null);        bossKey.interestOps(SelectionKey.OP_ACCEPT);        ssc.bind(new InetSocketAddress(10086));        // 1.创建固定数量的worker并初始化        Woker[] wokers = new Woker[Runtime.getRuntime().availableProcessors()];        for (int i = 0; i < wokers.length; i++) {            wokers[i] = new Woker("worker-" + i);        }        AtomicInteger index = new AtomicInteger();        while (true){            boss.select();            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();            while (iter.hasNext()) {                SelectionKey key = iter.next();                iter.remove();                if (key.isAcceptable()){                    SocketChannel sc = ssc.accept();                    sc.configureBlocking(false);                    log.debug("connected...{}", sc.getRemoteAddress());                    // 2.关联selector  round robin                    wokers[index.getAndIncrement() % wokers.length].register(sc);//                    sc.register(woker.selector, SelectionKey.OP_READ, null);                }            }        }    }    static class Woker implements Runnable{        private Thread thread;        private Selector selector;        private String name;        private volatile boolean start = false; // 还未初始化        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();        public Woker(String name) {            this.name = name;        }        // 初始化线程和selector        public void register(SocketChannel sc) throws IOException {            if (start) return;            this.thread = new Thread(this, name);            this.thread.start();            this.selector = Selector.open();            this.start = true;            // 像队列添加任务            queue.add(()->{                try {                    sc.register(selector, SelectionKey.OP_READ, null);                } catch (ClosedChannelException e) {                    e.printStackTrace();                }            });            selector.wakeup();  // 唤醒select方法        }        @Override        public void run() {            while (true){                try {                    this.selector.select();                    Runnable task = queue.poll();                    if (task != null) {                        task.run();                    }                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();                    while (iter.hasNext()){                        SelectionKey key = iter.next();                        iter.remove();                        if (key.isReadable()) {                            ByteBuffer buffer = ByteBuffer.allocate(16);                            SocketChannel channel = (SocketChannel)key.channel();                            channel.read(buffer);                            buffer.flip();                            System.out.println(Charset.defaultCharset().decode(buffer));                        }                    }                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }}
如何拿到cpu个数
  • Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理cpu个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10才修复,使用jvm参数UseContainerSupport配置,默认开启

NIO vs BIO

  • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream仅支持阻塞APl,channel同时支持阻塞、非阻塞API,网络channel可配合selector实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

当调用一次channel.read或stream.read后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:等待数据阶段、复制数据阶段

IO模型

阻塞IO、非阻塞IO、多路复用、信号驱动、异步IO

零拷贝问题

  • 传统IO问题

传统的IO将一个文件通过socket写出

File f = new File("helloword/data.txt");RandomAccessFile file = new RandomAccessFile(f,"r");byte[] buf = new byte[(int) f.length()];file.read(buf);Socket socket = ...;socket.getOutputStream().write(buf);

内部工作流程如下:

传统IO工作流程

  1. Java本身不具备IO读写能力,因此read方法调用后,要从Java程序的用户态切换至内核态,去调用操作系统的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用DMA来实现文件读,期间也不会使用cpu

    DMA也可以理解为硬件单元,用来解放cpu完成文件IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓存区(即byte[] buf),这期间cpu会参与拷贝,无法利用DMA

  3. 调用write方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,cpu会参与拷贝

  4. 接下来要向网卡写数据,这项能力Java又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

由此可见中间环节较多,Java的IO实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了3次,这个操作比较重量级
  • 数据拷贝了共4次
NIO优化

通过DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer:使用的还是Java内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer:使用的是操作系统内存,Java也可以访问
    NIO优化

大部分步骤与优化前相同,唯一一点:Java可以使用DirectByteBuffer将堆外内存映射到jvm内存种来直接访问使用

  • 这个内存块不受jvm垃圾回收影响,因此内存地址固定,有助于IO读写
  • Java种的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两部分
    • DirectByteBuf对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了Linux2.1后提供的sendFile方法),Java中对应着两个channel调用transferTo/transferFrom方法拷贝数据

NIO优化1

  1. java调用transferTo方法后,要从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用CPU
  2. 数据从内核缓冲区传输到socket缓冲区,cpu会参与拷贝
  3. 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了3次

进一步优化(Linux2.4)

NIO优化2

  1. java调用transferTo方法后,要从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用CPU
  2. 只会将一些offset和length信息拷入socket缓冲区,几乎无消耗
  3. 使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

整个过程仅只发生了一次用户态和内核态的切换,数据拷贝了2次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到jvm内存中,零拷贝的优点有:

  • 更少的用户态与内核态的切换
  • 不利用cpu计算,减少cpu缓存伪共享
  • 零拷贝适合小文件传输

AIO

AIO用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统提供支持

  • Windows系统通过IOCP实现了真正的异步IO
  • Linux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势
public static void main(String[] args){    try(AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("aa.txt"), StandardOpenOption.READ)){        ByteBuffer buf = ByteBuffer.allocate(16);        channel.read(buf, 0, buf, new CompletionHandler<Integer, ByteBuffer>(){            @Override            public void completed(Integer result, ByteBuffer attachment){                attachment.flip();                System.out.println(Charset.defaultCharset().decode(buffer));            }            @Override            public void failed(Throwable exc, ByteBuffer attachment){                exc.printStackTrace();            }        })    }}

Netty入门

概述

  • 什么是netty
    • Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

Netty的优势

  • Netty vs NIO,工作量大,bug多
    • 需要自己构建协议
    • 解决TCP传输问题,如黏包、半包
    • epoll空轮询导致CPU100%
    • 对API进行增强,使之更易使用,如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
  • Netty vs 其他网络应用框架
    • Mina由apache维护,将来3.x版本可能会有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速,API更简洁、文档更优秀
    • 久经考验,16年,Netty版本
      • 2.x 2004
      • 3.x 2008
      • 4.x 2013
      • 5.x 已废弃(没有明显的性能提升,维护成本高)

Hello Netty

  • 引入依赖
<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.39.Final</version></dependency>
  • 服务端代码
package com.zimo.netty.hello;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;/** * netty server * @author Liu_zimo * @version v1.0 by 2021/8/3 16:13:36 */public class HelloServer {    public static void main(String[] args) {        // 1.服务器端启动器,负责组装netty组件,启动服务器        new ServerBootstrap()                // 2.加入多个事件组EventLoop(selector, thread)                .group(new NioEventLoopGroup())                // 3.选择服务器ServerSocketChannel实现                .channel(NioServerSocketChannel.class)                // 4.决定了child能执行哪些操作(handler)                .childHandler(                        // 5.代表和客户端进行数据读写的通道channel                        new ChannelInitializer<NioSocketChannel>() {                    @Override   // 连接建立后。初始化,负责添加别的handler                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {                        // 添加具体handler                        nioSocketChannel.pipeline().addLast(new StringDecoder());   // 加入节码,将ByteBuf转换为String                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义handler                            @Override   // 读事件                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                                System.out.println(msg);    // 打印上一步转换好的字符串                            }                        });                    }                })                // 6.绑定监听端口                .bind(10086);    }}
  • 客户端代码
package com.zimo.netty.hello;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;/** * netty client * @author Liu_zimo * @version v1.0 by 2021/8/3 16:26:59 */public class HelloClient {    public static void main(String[] args) throws InterruptedException {        // 1.启动器        new Bootstrap()                // 2.添加EventLoop                .group(new NioEventLoopGroup())                // 3.选择客户端的channel实现                .channel(NioSocketChannel.class)                // 4.添加处理器                .handler(new ChannelInitializer<NioSocketChannel>() {                    @Override   // 建立连接后调用                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {                        nioSocketChannel.pipeline().addLast(new StringEncoder());   // 编码                    }                })                // 5.连接到服务器                .connect(new InetSocketAddress("localhost", 10086))	// 异步非阻塞                .sync() // 阻塞方法,直到建立连接,也可以使用addListener()方法实现异步调用                .channel()  // 代表连接对象                // 6.向服务器发送数据                .writeAndFlush("hello netty!");    }}

概念理解:

  • channel可以理解为数据的通道
  • msg理解为流动的数据,最开始输入的是ByteBuf,经过pipeline(流水线)的加工,变成其他类型对象
  • handler理解为数据的处理工序
    • 工序有多道,合在一起就算pipeline,pipeline负责发布事件(读,读取完成…)传播给每个handler,每个handler对自己感兴趣的事件进行处理
    • handler分为inbound和outbound两类(入站和出战)
  • eventLoop理解为处理数据的工人
    • 工人可以管理多个channel和io操作,工人负责某个channel绑定在一起
    • 工人既可以执行IO操作,也可以进行任务处理,每个工人有任务队列(普通任务、定时任务),队列里可以堆放多个channel的等待处理任务
    • 工人按照pipeline顺序,依次按照handler规划处理数据,每道工序指定不同的工人

组件

EventLoop

EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的IO事件

  • 继承关系比较复杂
    • 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
    • 另一条线是继承自netty自己的OrderedEventExecutor

EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都有此EventLoop来处理(保证了io事件处理时的线程安全)

  • 继承自netty的EventExecutorGroup
    public static void main(String[] args) {        // 1.创建事件循环组        EventLoopGroup group = new NioEventLoopGroup(2);     // io事件、普通任务、定时任务//        EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务、定时任务        // 2.获取下一个事件循环对象        group.next();        // 3.执行普通任务        group.next().submit(()->{            System.out.println(Thread.currentThread().getName() + ": ok");        });        group.next().execute(()->{            System.out.println(Thread.currentThread().getName() + ": ok");        });        // 4.定时执行任务        group.next().scheduleAtFixedRate(()->{            System.out.println(Thread.currentThread().getName() + ": ok");        }, 0, 1, TimeUnit.SECONDS); // 定时任务,开始任务时间,时间间隔,时间单位    }
handler执行中如何换人
  • 关键代码io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    // 下一个handler的事件循环是否与当前的事件循环是同一个线程    EventExecutor executor = next.executor();	// 返回下一个handler的eventloop    // 是否直接调用    if (executor.inEventLoop()) {	// 当前handler中的线程,是否和eventLoop是同一个线程        next.invokeChannelRead(m);    } else {        // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)        executor.execute(new Runnable() {            public void run() {                next.invokeChannelRead(m);            }        });    }}

Channel

channel的主要作用

  • close()可以用来关闭channel
  • closeFuture()用来处理channel的关闭
    • sync方法作用是同步等待channel关闭
    • 而addListener方法是异步等待channel关闭
  • pipeline()方法添加处理器
  • write()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出
    netty异步调用

要点:

  • 单线程无法异步提高效率,必须配合多线程、多核cpu才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加(提高的是吞吐量)
  • 合理进行任务拆分,也是利用异步的关键

Future & Promise

在异步处理时,经常用到这两个接口

首先说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise有对netty Future进行扩展

  • jdk Future只能同步等待任务结束(成功、失败)才能得到结果
  • netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称JDK FutureNetty FutureNetty Promise
cancel取消任务--
isCanceled任务是否取消--
idDone任务是否完成,不能区分成失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断-
sync-等待任务结束,如果任务失败,抛出异常(不获取结果)-
isSuccess-获取失败信息,非阻塞,如果没有失败,返回null-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结束
setFailure--设置失败结果
  • JDK Future
public static void main(String[] args) throws ExecutionException, InterruptedException {    ExecutorService service = Executors.newFixedThreadPool(2);    Future<Integer> submit = service.submit(new Callable<Integer>() {        @Override        public Integer call() throws Exception {            return 20;        }    });    System.out.println(submit.get().toString());}
  • Netty Future
public static void main(String[] args) throws ExecutionException, InterruptedException {    NioEventLoopGroup group = new NioEventLoopGroup();    EventLoop eventLoop = group.next();    Future<Integer> submit = eventLoop.submit(new Callable<Integer>() {        @Override        public Integer call() throws Exception {            return 10;        }    });    submit.addListener(new GenericFutureListener<Future<? super Integer>>() {        @Override        public void operationComplete(Future<? super Integer> future) throws Exception {            System.out.println(future.getNow());        }    });}
  • Netty Promise
public static void main(String[] args) throws ExecutionException, InterruptedException {    // 1.准备EventLoop对象    NioEventLoopGroup group = new NioEventLoopGroup();    // 2.可以主动创建一个promise,结果容器    DefaultPromise<Integer> promise = new DefaultPromise<>(group.next());    new Thread(()->{        // 3.任意线程执行计算,向promise填充结果        promise.setSuccess(100);    }).start();    System.out.println(promise.get());}

Handler & Pipeline

ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成—串,就是Pipeline

  • 入站处理器通常是ChannellnboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工

打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而后面要讲的ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

public static void main(String[] args) {    new ServerBootstrap()        .group(new NioEventLoopGroup())        .channel(NioServerSocketChannel.class)        .childHandler(new ChannelInitializer<NioSocketChannel>() {            @Override            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {                // 1.通过channel拿到pipeline                ChannelPipeline pipeline = nioSocketChannel.pipeline();                // 2.添加处理器  head -> hi1...hin -> tail   1...n                pipeline.addLast("hi1", new ChannelInboundHandlerAdapter(){                    @Override                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                        System.out.println(msg.toString());                        super.channelRead(ctx, msg);                        nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));                        // 使用ctx代表当前,而nioSocketChannel则是从head/tail开始                        // ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));                    }                });                // 3.出栈处理(先进后出)head <- ho1...hon <- tail n...1                pipeline.addLast("ho1", new ChannelOutboundHandlerAdapter(){                    @Override                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {                        System.out.println(msg.toString());                        super.write(ctx, msg, promise);                    }                });            }        })        .bind(8080);}

pipeline中工作流程

ByteBuf

是对字节数据的封装

  1. 创建:ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
public static void main(String[] args) {    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // 不指定容量默认256    System.out.println(buffer);    StringBuilder sb = new StringBuilder();    for (int i = 0; i < 300; i++) {        sb.append("a");    }    buffer.writeBytes(sb.toString().getBytes());    // 超过容量会自动扩容    System.out.println(buffer);}
  1. 直接内存 vs 堆内存

    • 可以使用下面的代码来创建池化基于堆的ByteBuf

      ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

    • 也可以使用下面的代码来创建池化基于直接内存的ByteBuf

      ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);

    • 直接内存创建和消费的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用

    • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放

  2. 池化 vs 非池化

    池化的最大意义在于可以重用ByteBuf,优点有

    • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力
    • 有了池化,则可以重用池中的ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
    • 高并发时,池化功能更节约内存,减少内存溢出的可能

    池化功能是否开启,可以通过下面的系统环境变量来设置

    -Dio.netty.allocator.type={unpooled|pooled}

    • 4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现
    • 4.1之前,池化功能还不成熟,默认是非池化实现
  3. 组成

    ByteBuf由4部分组成,最开始读写指针都在0位置,有独立的读写指针,不需要切换读写模式
    ByteBuf组成

  4. 写入

    方法列表,只列出重点方法

    方法签名2含义备注
    writeBoolean(boolean value)写入boolean值用一字节01|00代表true|false
    writeByte(int value)写入byte值
    writeShort(int value)写入short值
    writeInt(int value)写入int值Big Endian,即0x250,写入后00 00 02 50
    writeIntLE(int value)写入int值Little Endian,即0x250,写入后50 02 00 00
    writeLong(long value)写入long值
    writeChar(int value)写入char值
    writeFloat(float value)写入float值
    writeDouble(double value)写入double值
    writeBytes(ByteBuf src)写入netty的ByteBuf
    writeBytes(byte[] src)写入bytep[]
    writeBytes(ByteBuffer src)写入nio的ByteBuffer
    int writeCharSequence(CharSequence squence, Charset charset)写入字符串

    注意:

    • 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用
    • 网络传输,默认习惯是Big Endian
  5. 扩容

    再写入一个int整数时,容量不够了(初始容量设为10),这是会触发扩容buffer.writeInt(100);

    • 扩容规则
      • 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,则扩容后capacity是16
      • 如果写入后数据大小超过512,则选择下一个2n,例如写入后大小为513,则扩容后capacity是210=1024(2^9=512已经不够了)
      • 扩容不能超过max capacity会报错
  6. 读取

    读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分,如果需要重复读取int整数5,可以在read前先做个标记mark,需要时,重置到标记位reset,还有则是采用get方法,但该方法不会改变指针位置

  7. retain & release

    由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收

    • UnpooledHeapByteBuf使用的是JVM内存,只需等待GC回收内存即可
    • UnpooledDirectByteBuf使用的是直接内存,需要特殊方法来回收内存
    • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存
      • 回收内存的源码实现,关注下面方法的不同实现:protected abstract void deallocate()

    Netty这里采用了引用技术法控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

    • 每个ByteBuf对象的初始计数为1
    • 调用release方法计数减1,如果计数为0,ByteBuf内存被回收
    • 调用retain方法计数加1,表示调用者没用完之前,其他handler即使调用了release也不会造成回收
    • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

    谁来释放release,pipeline一般需要传递ByteBuf给下一个handler,如果在handler的finally中release,就失去了传递性

    基本规则是,谁是最后使用者,谁负责release。

  8. slice

    【零拷贝】体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf独立维护read,write指针

public static void main(String[] args) {    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);    buffer.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});        // 在切片过程中,没有发生数据复制,最大容量不能超过切片的数量    ByteBuf f1 = buffer.slice(0, 5);	// abcde    ByteBuf f2 = buffer.slice(5, 5);	// fghij    // 修改切片值,原有数据也会发生改变    f1.setByte(0, 'b');    System.out.println(buffer);	// bbcedfghij    // 释放原有内存,切片无法使用    buffer.release();    System.out.println(f1);	// error}
  1. duplicate

    【零拷贝】体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的

  2. copy

    会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关

public static void main(String[] args) {    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();    buf1.writeBytes(new byte[]{1,2,3,4,5});    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();    buf2.writeBytes(new byte[]{6,7,8,9,10});    // 传统方案    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();    buffer.writeBytes(buf1).writeBytes(buf2);    // CompositeByteBuf    CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();    bufs.addComponents(true, buf1, buf2);}
  1. Unpooled

    Unpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等操作

    这里只介绍跟【零拷贝】相关的wrappedBuffer方法,可以用来包装ByteBuf

public static void main(String[] args) {    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();    buf1.writeBytes(new byte[]{1,2,3,4,5});    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();    buf2.writeBytes(new byte[]{6,7,8,9,10});    // 当包装ByteBuf个数超过一个时,底层使用CompositeBuf    ByteBuf buffer = Unpooled.wrappedBuffer(buf1, buf2);    System.out.println(ByteBufUtil.prettyHexDump(buffer))}

也可用来包装普通子节数组,底层也不会有拷贝操作

ByteBuf优势
  • 池化 - 可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像ByteBuffer一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf

双向通信

读和写的误解

最初的认识,以为只有在netty,nio这样的多路复用IO模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket是全双工的:在任意时刻,线路上存在A到B和B到A的双向信号传输。即使是阻塞IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

Netty进阶

滑动窗口

滑动窗口

  • TCP以一个段为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点就算包的往返实际越长,性能就越差
  • 为了解决此问题,引入了窗口概念,窗口大小决定了无需等待应答可以继续发送的数据最大值
  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
    • 窗口内的数据才允许被发送,当应答未到达之前,窗口必须停止滑动
    • 如果数据ack回来了,窗口就可以向前滑动
    • 接收方也会维护一个窗口,只有落在窗口内数据才能被允许接收

粘包和半包

粘包现象

  • 现象:发送abc、def,接收abcdef
  • 原因
    • 应用层:接收方ByteBuf设置太大(Netty默认1024)
    • 滑动窗口:假设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这256bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文,就会黏包
    • Nagle算法:会造成黏包
  • 解决方案:
    1. 短链接,客户端发送完,断开连接,消息边界,从连接建立到连接断开,无法解决半包问题
    2. 定长消息解码器,与客户端约定子节大小,FixedLengthFrameDecoder(10)
    3. 分隔符消息解码器(回车+换行):LineBasedFrameDecoder(最大长度)、(自定义分隔符):DelimiterBasedFrameDecoder(最大长度,指定分隔符)
    4. LTC消息解码器(基于长度字段):LangthFieldBasedFrameDecoder(最大长度,长度字段偏移量,长度字段的长度,长度字段为基准还有间隔几个字节才是内容,从头剥离几个字节)
public static void main(String[] args) {    EmbeddedChannel channel = new EmbeddedChannel(            new LengthFieldBasedFrameDecoder(1024,0,4,1,4),            new LoggingHandler(LogLevel.DEBUG)    );    // 4个字节的内容长度,实际内容    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();    send(buffer, "hello, world");    send(buffer, "hi!");    channel.writeInbound(buffer);}private static void send(ByteBuf buffer, String content) {    byte[] bytes = content.getBytes();    buffer.writeInt(bytes.length);    buffer.writeByte(1);    buffer.writeBytes(bytes);}

半包现象

  • 现象:发送abcdef,接收abc、def
  • 原因:
    • 应用层:接收方ByteBuf小于实际发送数据量
    • 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时放不下了,只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包
    • MSS限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包

本质是因为TCP是流式协议,消息无边界

协议设计与解析

自定义协议要素:

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用那种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊…跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

@Sharable:netty线程安全

连接假死

  • 原因
    • 网络设备出现故障,例如网卡,机房等,底层的TCP连接已经断开了,但应用程序没有感知到,仍然占用着资源
    • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
    • 应用程序线程阻塞,无法进行读取数据
  • 问题
    • 假死的连接占用的资源不能自动释放
    • 向假死的连接发送数据,得到的反馈是发送超时

源码剖析

启动剖析

  • 传统NIO流程
// 1.netty中使用NioEvnetLoopGroup(简称nio boss线程)来封装线程和selectorSelector selector = Selector.open();// 2.创建NioServerSocketChannel,同时会初始化它关联的handler,以及为原生ssc存储configNioServerSocketChannel attachment = new NioServerSocketChannel();// 3.创建NioServerSocketChannel时,创建了Java原生的ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();	// 1serverSocketChannel.configureBlocking(false);// 4.启动nio boss线程执行接下来操作// 5.注册(仅关联selector和NioServerSocketChannel),未关注事件SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);		// 2// 6.head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加acceptor// 7.绑定端口serverSocketChannel.bind(new InetSocketAddress(8080));		// 3// 8.触发channel active事件,在head中关注op_accept事件selectionKey.interestOps(SelectionKey.OP_ACCEPT);		// 4
  • Netty中启动流程
new ServerBootstrap()    .group(new NioEventLoopGroup())    .channel(NioServerSocketChannel.class)    .childHandler(new ChannelInitializer<NioSocketChannel>(){        @Override        protected void initChannel(NioSocketChannel ch){            ch.pipeline().addLast(new LoggingHandler());        }    }).bind(8080);/* 1.bind() -> doBind() -> initAndRegister()	对应传统NIO的 1、2 	1.1 init: 		* 创建NioServerSocketChannel【main-thread】 		* 添加NioServerSocketChannel的初始化handler【main-thread】 			* 初始化handler等待调用【nio-thread】    1.2 register:切换线程    	* 启动 nio boss线程 【main-thread】    	* 原生ssc注册至selector未关注事件 【nio-thread】    	* 执行NioServerSocketChannel初始化handler 【nio-thread】 2.regFuture等待回调doBind0(): 	* 原生ServerSocketChannel绑定端口【nio-thread】 	* 触发NioServerSocketChannel active事件【nio-thread】*/

NioEventLoop

  • NioEventLoop重要组成:selector、线程、任务队列
  • NioEventLoop即会处理io事件,也会处理普通任务和定时任务
  1. **selector何时创建:**在构造方法调用时创建
    • **selector为何又两个selector成员:**一个原始(基于Set实现),一个netty优化后的(基于数组实现),为了遍历key时提高性能
  2. **nio线程在何时启动:**当首次调用execute方法时,通过状态位控制线程只会启动一次
  3. **提交普通任务会不会结束select阻塞:**会阻塞,但是在提交任务时会被唤醒
    • **wakeup方法中的代码如何理解:**只有其他线程提交任务,才会调用selector的wakeup
    • **wakeenUp变量的作用是什么:**原子变量,如果多个线程都提交任务,只需唤醒一次即可,避免wakeup被频繁调用
  4. **每次循环时,什么时候会进入SelectStrategy.SELECT分支:**当没有任务时,才会进入该分支,当有任务时会调用selectNow方法,顺便拿到IO事件
    • **何时会select阻塞,阻塞多久:**没有定时任务时阻塞,当有任务、事件、超时时间(1s+0.5ms)截至
  5. **nio空轮询bug在哪里体现,如何解决:**计数法空轮询数量超过指定次数,重新创建一个selector,替换掉旧的selector(JDK在Linux环境下的selector导致的bug)
  6. **ioRatio控制什么,设置为100有何作用:**控制处理IO事件所占用时间的比例50%。设置为100,执行完IO任务后,会将所有普通任务执行完
  7. **selectedKeys优化是怎么回事:**netty使用数组替换传统Nio中的Set方式,提高遍历效率
  8. **在哪区分不同事件类型:**processSelectedKey()中实现

accept流程

  • 传统NIO流程:
    1. selector.select()阻塞直到事件发生
    2. 遍历处理selectedKeys
    3. 拿到一个key,判断事件类型是否为accept
    4. 创建SocketChannel,设置非阻塞
    5. 将SocketChannel注册至selector
    6. 关注selectionKey的read事件
  • Netty流程:
    • 1、2、3在EventLoop中实现了
    • processSelectedKey() -> unsafe.read()
      • 4 -> doReadMessage():创建了NioSocketChannel
      • 5 -> doRegister():sc.register(eventLoop选择器,0,NioSocketChannel),调用NioSocketChannel上的初始化器
      • 6 -> pipeline() -> doBeginRead()

read流程

  1. selector.select()阻塞直到事件发生
  2. 遍历处理selectedKeys
  3. 拿到一个key,判断事件类型是否为read
  4. 读取操作

http://www.ppmy.cn/news/246491.html

相关文章

NIO多路复用之Selector的使用

Selector的使用 文章目录 Selector的使用一、阻塞 & 非阻塞1. 阻塞2. 非阻塞 二、selector 介绍及常用API1. 多路复用2. 常用API 三、处理 accept 事件四、处理 read 事件1. 为什么事件必须删除2. 处理客户端断开问题2.1 客户端强制断开2.2 客户端正常断开 3. 处理消息边界…

NIO网络编程

一、阻塞模式 服务端 public class server {public static void main(String[] args) throws IOException {//用 nio 来理解阻塞模式单线程//ByteBufferByteBuffer buffer ByteBuffer.allocate(16);//1.创建服务器ServerSocketChannel ssc ServerSocketChannel.open();//2.…

NIO的理解和使用

一、概述 NIO是 non-blocking-io的简称&#xff0c;非阻塞IO&#xff0c;由于它是后续出来的IO模型&#xff0c;有时也叫做 new-IO。NIO是后续比如React等的多路复用的基础模型。它是UNIX的五种IO模型中的一种。 NIO有三大组件&#xff1a;buffer、channel和selector&#xff…

NIO基础笔记

Netty深入浅出笔记 1. NIO基础1.1 三大组件1.1.1 Channel & Buffer1.1.2 Selector 1.2 ByteBuffer1.2.1 ByteBuffer 正确使用姿势1.2.2 ByteBuffer结构1.2.3 ByteBuffer核心属性1.2.4 ByteBuffer常见方法1.2.5 字符串与 ByteBuffer 互转1.2.6 Scattering Reads(分散读取)1.…

Netty学习(一)-- Netty 底层 Java NIO

视频地址&#xff0c;建议观看&#xff0c;老师由浅入深&#xff0c;循序渐进&#xff1a; https://www.bilibili.com/video/BV1py4y1E7oA 前面的学习&#xff1a;https://blog.csdn.net/weixin_43989102/article/details/126078132 目录 1、NIO1.1、Selector1&#xff09;多线…

Netty01——NIO 基础

目录 1.三大组件1.1.Channel & Buffer1.2.Selector1.2.1.多线程版设计1.2.2.线程池版设计1.2.3.selector 版设计 2.ByteBuffer2.1.ByteBuffer 使用方式2.2.ByteBuffer 结构2.2.1.ByteBuffer 的属性2.2.2.调试工具类2.2.3.使用调试工具类 2.3.ByteBuffer 常见方法2.3.1.分配…

NIO基础

NIO基础 1.三大组件 1.1 Channel & Buffer Channel有一点类似于stream&#xff0c;它就是读写数据的双向通道&#xff0c;可以从channel将数据读入buffer&#xff0c;也可以将buffer的数据写入channel&#xff0c;而之前的stream要么是输入&#xff0c;要么是输出&#…

JavaNIO——多线程以及IO模型(笔记)

文章目录 一、多线程优化1.1 Boss与Worker1.2 初步实现1Boss&#xff0c;1Worker1.3 线程数目配置 二、UDP三、IO模型3.1 stream&#xff08;BIO&#xff09; vs channel&#xff08;NIO&#xff09;3.2 IO模型3.2.1 阻塞IO3.2.2 非阻塞IO3.3.3 多路复用3.3.4 信号驱动3.3.5 异…