手撕netty源码(一)- NioEventLoopGroup

server/2024/9/25 8:34:10/

文章目录

  • 前言
  • 一、NIO 与 netty
  • 二、NioEventLoopGroup 对象的创建过程
    • 2.1 创建流程图
    • 2.2 EventExecutorChooser 的创建


前言

processOn文档跳转
本文是手撕netty源码系列的开篇文章,会先介绍一下netty对NIO关键代码的封装位置,主要介绍 NioEventLoopGroup 对象的创建过程,看看new一个对象可以做哪些事情。


一、NIO 与 netty

平时使用NIO的主要步骤:

java">/*创建选择器的实例*/
Selector selector = Selector.open();
/*创建ServerSocketChannel的实例*/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();/*设置通道为非阻塞模式*/
serverSocketChannel.configureBlocking(false);
/*绑定端口*/
serverSocketChannel.socket().bind(new InetSocketAddress(port));
/*注册事件,表示关心客户端连接*/
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while(true){/*获取当前有哪些事件*/selector.select(1000);/*获取事件的集合*/Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。*/iterator.remove();handleInput(key);}
}/*处理事件的发生*/
private void handleInput(SelectionKey key) throws IOException {if(key.isValid()){/*处理新接入的客户端的请求*/if(key.isAcceptable()){/*获取关心当前事件的Channel*/ServerSocketChannel ssc= (ServerSocketChannel) key.channel();/*接受连接*/SocketChannel sc = ssc.accept();System.out.println("==========建立连接=========");sc.configureBlocking(false);/*关注读事件*/sc.register(selector,SelectionKey.OP_READ);}/*处理对端的发送的数据*/if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();/*创建ByteBuffer,开辟一个缓冲区*/ByteBuffer buffer = ByteBuffer.allocate(1024);/*从通道里读取数据,然后写入buffer*/int readBytes = sc.read(buffer);if(readBytes>0){/*将缓冲区当前的limit设置为position,position=0,用于后续对缓冲区的读取操作*/buffer.flip();/*根据缓冲区可读字节数创建字节数组*/byte[] bytes = new byte[buffer.remaining()];/*将缓冲区可读字节数组复制到新建的数组中*/buffer.get(bytes);String message = new String(bytes,"UTF-8");System.out.println("服务器收到消息:"+message);/*处理数据*/String result = Const.response(message);、、、、、}else if(readBytes<0){/*取消特定的注册关系*/key.cancel();/*关闭通道*/sc.close();}}、、、、}}

平时使用netty的主要步骤:

java">// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ServerInit());// 绑定端口,同步等待成功
b.bind(NettyConstant.SERVER_PORT).sync();

那么,netty 对 NIO 的封装具体体现在哪里呢?先揭晓答案,后续一点点细嚼慢咽

  1. 创建选择器的实例
    io/netty/channel/nio/NioEventLoop.java
java">NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");final SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
  1. 创建ServerSocketChannel的实例
    io/netty/channel/socket/nio/NioServerSocketChannel.java
java">private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {try {ServerSocketChannel channel =SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);return channel == null ? provider.openServerSocketChannel() : channel;} catch (IOException e) {throw new ChannelException("Failed to open a socket.", e);}
}
  1. 设置通道为非阻塞模式
    io/netty/channel/nio/AbstractNioChannel.java
java">protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}}
  1. 绑定端口
    io/netty/bootstrap/AbstractBootstrap.java
java">private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
  1. 注册事件,表示关心客户端连接
    io/netty/channel/nio/AbstractNioChannel.java
java">protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
  1. 获取当前事件的集合
  2. 处理事件
    io/netty/channel/nio/NioEventLoop.java
java">private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();}// Timeout will only be 0 if deadline is within 5 microsecslong timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}

其实,netty 也不难对吧
学习netty,主要学习它的设计思想和对性能优化的巧妙处理,当工作需要时,能够灵活运用

二、NioEventLoopGroup 对象的创建过程

2.1 创建流程图

在这里插入图片描述
可以看到,其实我们传的线程数量实际控制的是NioEventLoop对象创建的数量,而每个 NioEventLoop 其实是一个Executor执行器,那么至此,我们只是相当于创建了两个 NioEventLoopGroup 对象,他们分别有自己的children执行器 NioEventLoop 数组,同一个数组内的 NioEventLoop 共享一个ThreadPerTaskExecutor执行器,但是现在这个执行器后续如何处理事件和如何调度还不知道,后续会讲到,本文先看看创建NioEventLoopGroup对象都做了什么
在这里插入图片描述

2.2 EventExecutorChooser 的创建

java">// io/netty/util/concurrent/MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}// io/netty/util/concurrent/MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {checkPositive(nThreads, "nThreads");if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {、、、}}// *********关键代码********chooser = chooserFactory.newChooser(children);、、、
}// io/netty/util/concurrent/DefaultEventExecutorChooserFactory.java
public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}
}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;
}

这段代码很简单但是有需要我们学习的地方,从类名和方法名可以看出来,这个工厂类是创建事件执行者选择器的,并且是通过我们创建NioEventLoopGroup时指定的线程数来创建不同的选择器:

  • 当数量是2的次幂时,创建PowerOfTwoEventExecutorChooser
  • 否则,创建GenericEventExecutorChooser

(val & -val) == val
netty 使用这种方法来判断一个数是不是2的倍数,稍微讲一下,& 是"与"运算,只有1&1才得1,那么一个数的负数用二进制是怎么表示的呢?答案是“补码”,也就是对这个数的二进制取反+1,举例:
8的二进制是0000 1000,取反之后是 1111 0111,加1之后是 1111 1000,所以-8的二进制就是 1111 1000

0000 1000 & 1111 1000 = 0000 1000
学到了吧,以后有人问你如何判断一个数是不是2的次幂时,就可以用这个方法,因为二进制与或运算比加减运算更加高效

在这里插入图片描述

从这个工厂类的注释看,无论使用哪个选择器,策略都是轮询,那么为什么还涉及两个选择器呢?来看看具体实现:

java">private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
}private static final class GenericEventExecutorChooser implements EventExecutorChooser {// Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.// The 64-bit long solves this by placing the overflow so far into the future, that no system// will encounter this in practice.private final AtomicLong idx = new AtomicLong();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];}
}

PowerOfTwoEventExecutorChooser 中定义了一个AtomicInteger idx,选择执行器的算法是“idx.getAndIncrement() & executors.length - 1”,举例说明:如果executors.length是2的次幂,那么二进制就是1000…,那么减1之后就是 01111…,和任何数做“按位与”运算,结果都只会是0到executors.length - 1之间,只要这个数递增的,那么就会在0到executors.length - 1之间轮询,达到轮询的目的,很巧妙吧,又学到了~

GenericEventExecutorChooser 的算法就很普通了,对executors.length取余

所以,在创建NioEventLoopGroup的时候,知道如何指定线程数了吧!


http://www.ppmy.cn/server/20247.html

相关文章

深入剖析图像平滑与噪声滤波

噪声 在数字图像处理中&#xff0c;噪声是指在图像中引入的不希望的随机或无意义的信号。它是由于图像采集、传输、存储或处理过程中的各种因素引起的。 噪声会导致图像质量下降&#xff0c;使图像失真或降低细节的清晰度。它通常表现为图像中随机分布的亮度或颜色变化&#…

TypeScript入门第一天,所有类型+基础用法+接口使用

表示逻辑值&#xff1a;true 和 false。在JavaScript和TypeScript里叫做boolean | | 数组类型 | 无 | 声明变量为数组。 // 在元素类型后面加上[] let arr: number[] [1, 2]; // 或者使用数组泛型&#xff0c;Array<元素类型> let arr: Array [1, 2]; | | 元组…

MySQL__索引

文章目录 &#x1f60a; 作者&#xff1a;Lion J &#x1f496; 主页&#xff1a; https://blog.csdn.net/weixin_69252724 &#x1f389; 主题&#xff1a; MySQL__索引&#xff09; ⏱️ 创作时间&#xff1a;2024年04月23日 ———————————————— 这里写目…

SpringBoot学习之SpringBoot3集成OpenApi(三十八)

Springboot升级到Springboot3以后,就彻底放弃了对之前swagger的支持,转而重新支持最新的OpenApi,今天我们通过一个实例初步看看OpenApi和Swagger之间的区别. 一、POM依赖 我的POM文件如下,仅作参考: <?xml version="1.0" encoding="UTF-8"?>…

盲人咖啡厅导航:科技之光点亮独立生活新里程

在这个繁华的世界中&#xff0c;咖啡厅不仅是人们社交聚会、休闲阅读的场所&#xff0c;更是无数人心灵栖息的一方天地。然而&#xff0c;对于视障群体而言&#xff0c;独自前往这样的公共场所往往面临重重挑战。幸运的是&#xff0c;一款名为蝙蝠避障专为盲人设计的辅助应用&a…

把私有数据接入 LLMs:应用程序轻松集成 | 开源日报 No.236

run-llama/llama_index Stars: 29.9k License: MIT llama_index 是用于 LLM 应用程序的数据框架。 该项目解决了如何最佳地利用私有数据增强 LLMs&#xff0c;并提供以下工具&#xff1a; 提供数据连接器&#xff0c;以摄取现有的数据源和各种格式&#xff08;API、PDF、文档…

Spring boot + Redis + Spring Cache 实现缓存

学习 Redis 的 value 有 5 种常用的数据结构 Redis 存储的是 key-value 结构的数据。key 是字符串类型&#xff0c;value 有 5 种常用的数据结构&#xff1a; Redis 的图形化工具 Another Redis Desktop Manager Spring Data Redis Redis 的 Java 客户端。 Spring Cache Spr…

SpringMVC interceptor有时候配置的时候path=“/**“ 两个星号什么意思,与path=“/“以及path=“/*“什么区别

直接上案例&#xff1a; <mvc:interceptor> <mvc:mapping path"/**"/> <bean class"com.xuyang.interceptor.user.UserAuthInterceptor" /> </mvc:interceptor>/**的意思是所有文件夹及里面的子文件夹 /*是所有文件夹&#xff0c…