Netty—EventLoop

news/2024/11/16 15:49:20/

文章目录

  • 一、EventLoopGroup 是什么?🤔️
  • 二、NioEventLoop 有哪些重要组成部分?🔍
  • 三、NioEventLoop 的 thread 在何时启动?
  • 三、 run() 方法中线程在干嘛?

一、EventLoopGroup 是什么?🤔️

EventLoop,即"事件循环对象",本质是一个单线程执行器(同时维护了一个 Selector),里面有run方法处理 Channel 上源源不断的 io 事件。

EventLoop它的继承关系比较复杂

  • 继承java.util.concurrent.ScheduledExecutorService ,因此包含了线程池中所有方法
  • 继承netty自身的OrderedEventExecutor,它提供了boolean inEventLoop(Thread thread);方法判断一个线程是否属于此 EventLoop,EventLoopGroup parent();方法查看自己属于哪个 EventLoopGroup。

EventLoopGroup,即“事件循环组”,是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 channel 上的io事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)。

EventLoopGroup继承 netty 自己的 EventExecutorGroup,实现了 Iterable 接口提供遍历 EventLoop 的能力,另有 next() 方法获取集合中下一个 EventLoop。
在这里插入图片描述

二、NioEventLoop 有哪些重要组成部分?🔍

接下来我们通过源码查看 NioEventLoop 有哪些部分组成~

通过查看 NioEventLoop 类,我们发现其维护了 Selector!可是为什么是两个selector呢?这就是netty优化selector之处,在后面会为大家详细讲解!

public final class NioEventLoop extends SingleThreadEventLoop {//....../*** The NIO {@link Selector}.*/private Selector selector;private Selector unwrappedSelector;
}

我们查看NioEventLoop的父类io.netty.util.concurrent.SingleThreadEventExecutor,维护了一个线程和任务队列!

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {//......private final Queue<Runnable> taskQueue;private volatile Thread thread;
}

其父类io.netty.util.concurrent.AbstractScheduledEventExecutor中维护了一个任务队列!

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {//......PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
}

NioEventLoop主要由selector、线程和任务队列组成,NioEventLoop既会处理io事件,也可以处理普通任务和定时任务!

NioEventLoop 中的 selector 何时创建?

我们发现在EventLoop的构造方法中就已经将Selector创建!

在这里插入图片描述

NioEventLoop 为何有两个 selector 成员呢?

原生的selector中对于selectedKeys的实现是采用Set结构,想必大家一定知道set的遍历性能并不高!netty使用反射的方式将其读取出来替换成数组结构去维护~

在这里插入图片描述

三、NioEventLoop 的 thread 在何时启动?

当然是在执行 execute() 方法时创建咯,我们通过debug的方式来查看服务端这边NioEventLoop的nio线程启动过程!

以上通过bind()方法一直点下去,直至 io.netty.bootstrap.AbstractBootstrap#doBind0

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}

此时调用了channel所关联的eventLoop的execute()方法,我们查看io.netty.util.concurrent.SingleThreadEventExecutor#execute

@Override
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列addTask(task);if (!inEventLoop) {// inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThreadstartThread();if (isShutdown()) {// 如果已经 shutdown,做拒绝逻辑boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {// 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程wakeup(inEventLoop);}
}

在这里插入图片描述

此时会先调用inEventLoop(),判断当前线程是否是eventLoop线程。显然在此时eventLoop线程是null。inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread

在这里插入图片描述

进入startThread() 方法,其内部通过状态位控制线程只会启动一次。其内部使用执行器创建一个线程,并将其赋给eventLoop的thread属性,调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环!

在这里插入图片描述

三、 run() 方法中线程在干嘛?

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

protected void run() {for (;;) {try {try {// calculateStrategy 的逻辑如下:// 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch// 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:// 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒// 进行 select 阻塞,并设置唤醒状态为 falseboolean oldWakenUp = wakenUp.getAndSet(false);// 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup// 下面的 select 方法不会阻塞// 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?// 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时// 才能执行,让 select 方法无谓阻塞select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;// ioRatio 默认是 50final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// ioRatio 为 100 时,总是运行完所有非 IO 任务runAllTasks();}} else {                final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 记录 io 事件处理耗时final long ioTime = System.nanoTime() - ioStartTime;// 运行非 IO 任务,一旦超时会退出 runAllTasksrunAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

io.netty.channel.DefaultSelectStrategy#calculateStrategy,判断是否有任务,没有任务,返回 SelectStrategy.SELECT,看若有任务则调用selectNow()拿取

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {@Overridepublic int get() throws Exception {return selectNow();}
};

接着进入io.netty.channel.nio.NioEventLoop#select 方法

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();// 计算等待时间,没有 scheduledTask,超时时间为 1s;有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 如果超时,退出循环if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeupif (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// select 有限时阻塞// 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%int selectedKeys = selector.select(timeoutMillis);// 计数加 1selectCnt ++;// 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {// 线程被打断,退出循环// 记录日志selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 如果超时,计数重置为 1,下次循环就会 breakselectCnt = 1;} // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512// 这是为了解决 nio 空轮询 bugelse if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 重建 selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {// 记录日志}} catch (CancelledKeyException e) {// 记录日志}
}

netty通过轮训执行次数是否到达预值判断是否发生空轮训bug,采用更换 selector 的方式去解决问题!

我们回到io.netty.channel.nio.NioEventLoop#run 方法中,通过以上分析我们知道一般只有在有任务或者有事件的时候才结束阻塞。那会不会出现任务的处理时间过长而耽误事件的处理呢?

					 	// ioRatio 默认是 50final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// ioRatio 为 100 时,总是运行完所有非 IO 任务runAllTasks();}} else {                final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// 记录 io 事件处理耗时final long ioTime = System.nanoTime() - ioStartTime;// 运行非 IO 任务,一旦超时会退出 runAllTasksrunAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}

ioRatio 参数就是为了控制处理io事件 和 处理任务之间的时间比例!

我们来详细看看处理io事件的 io.netty.channel.nio.NioEventLoop#processSelectedKeys 方法

private void processSelectedKeys() {if (selectedKeys != null) {// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}

默认都会进入我们优化后的 io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized 方法

private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;// 拿到 channelfinal Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}

拿到channel后判断是不是AbstractNioChannel,已知该类是所有Nio channel的父类。如果是则进入方法 io.netty.channel.nio.NioEventLoop#processSelectedKey,区分当前事件处理相关IO事件操作……

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// 当 key 取消或关闭时会导致这个 key 无效if (!k.isValid()) {// 无效时处理...return;}try {int readyOps = k.readyOps();// 连接事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// 可写事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}// 可读或可接入事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#readunsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

http://www.ppmy.cn/news/1094392.html

相关文章

Windows Server 系统各版本及授权说明(附下载地址

本文为Windows Server系统各版本差异对比及授权说明。 会对相关目前仍主流使用的相关Windows Server系统版本和相关授权进行对比和功能说明。 WindowsServer2012 R2 Windows Server 2012 R2授权方式是按照物理CPU数量进行授权&#xff0c;比如物理服务器CPU插槽数量2&#xff…

OCR训练部署文档

Cuda安装 wget https://developer.download.nvidia.com/compute/cuda/11.6.0/local_installers/cuda_11.6.0_510.39.01_linux.run sh cuda_11.6.0_510.39.01_linux.run#可能会报错&#xff0c;查看/var/log/nvidia-installer.log &#xff0c;kill -9 [ID]可以解决vim ~/.bash…

JAVA 比较两个区间是否存在交集

最近遇到一个开发问题&#xff0c;判断两个价格的大小&#xff0c;听着很简单&#xff0c;但其实价格是浮动的&#xff0c;也就是说价格是一个范围&#xff0c;比如物品A的价格是5&#xff5e;10&#xff0c;现在我们通过筛选条件&#xff0c;把价格符合在8&#xff5e;20之前的…

Python 正则表达式:强大的文本处理工具

概念&#xff1a; 正则表达式是一种强大的文本匹配和处理工具&#xff0c;它可以用来在字符串中查找、替换和提取符合某种规则的内容。在Python中&#xff0c;使用re模块可以轻松地操作正则表达式&#xff0c;它提供了丰富的功能和灵活的语法。 场景&#xff1a; 正则表达式…

Python基础List列表定义与函数

如何定义一个非空的列表&#xff1f; name_list ["liming","xiaohong",15,{"hobby":"basketball"}] 列表的特点&#xff1a; 1.列表是有序的 2.可以存放多个元素 3.每个元素可以是任何数据类型 定义一个空列表 name_list [] 访…

一个新工具 nolyfill

名字的意思&#xff0c; 我自己的理解 no(po)lyfill 正如它的名字, 不要再用补丁了, 当然这里说的是过时的补丁。 polyfill 是补丁的意思 为什么要用这个插件 文档原文: 当您通过安装最新的 Node.js LTS 来接受最新的功能和安全修复时&#xff0c;像eslint-plugin-import、…

pytorch学习——循环神经网络RNN讲解及其实现

参考书籍&#xff1a;8.6. 循环神经网络的简洁实现 — 动手学深度学习 2.0.0 documentation 参考视频&#xff1a;54 循环神经网络 RNN【动手学深度学习v2】_哔哩哔哩_bilibili 一.介绍 循环神经网络RNN&#xff08;Recurrent Neural Network &#xff09;是一类广泛应用于序列…

Go语言基础语法|疑难分析及相关补充

疑难分析 1.对于range遍历的理解 eg&#xff1a; package main import "fmt" func main() { nums : []int{2, 3, 4} sum : 0 for i, num : range nums { sum num if num 2 { fmt.Println("index:", i, "num:", num) } } …