Netty核心源码分析(五)核心组件EventLoop源码分析

news/2024/11/24 5:41:18/

文章目录

  • 系列文章目录
  • 一、EventLoop源码分析
    • 1、NioEventLoop源码
    • 2、EventLoop的父接口SingleThreadEventExecutor
      • (1)addTask方法
      • (2)startThread方法
    • 3、NioEventLoop的run方法(核心!)
      • (1)select
    • 4、小结

系列文章目录

Netty核心源码分析(一),Netty的Server端启动过程源码分析
Netty核心源码分析(二),Netty的Server端接收请求过程源码分析
Netty核心源码分析(三)业务请求执行关键——ChannelPipeline、ChannelHandler、ChannelHandlerContext源码分析
Netty核心源码分析(四)心跳检测源码分析
Netty核心源码分析(五)核心组件EventLoop源码分析

一、EventLoop源码分析

之前我们简单分析过NioEventLoopGroup的源码。今天我们分析一下EventLoop执行的源码。

1、NioEventLoop源码

首先我们分析一下类继承关系图:
在这里插入图片描述
(1)ScheduledExecutorService接口表示是一个定时任务接口,EventLoop可以接受定时任务。
(2)EventLoop接口:一旦Channel注册了,就处理该Channel对应的所有IO操作。
(3)SingleThreadEventExecutor接口表示这是一个单线程的线程池。
(4)EventLoop是一个单例的线程池,里面含有一个死循环的线程不断地做着三件事情:监听端口、处理端口事件、处理队列事件。每个EventLoop都可以绑定多个Channel,而每个Channel始终只能由一个EventLoop来处理。

2、EventLoop的父接口SingleThreadEventExecutor

SingleThreadEventExecutor是一个单线程的线程池,其中包含着execute方法是EventLoop使用的源头:

// io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 是否是当前线程boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) { // 如果该EventLoop的线程不是当前线程startThread(); // 开启线程if (isShutdown() && removeTask(task)) {// 如果线程已经停止,并且删除任务失败,执行拒绝策略,默认是抛出异常RejectedExecutionExceptionreject();}}// 如果addTaskWakesUp是false,并且任务不是NonWakeupRunnable类型的,尝试唤醒selector,这个时候,阻塞在selector的线程会立即返回if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}

(1)addTask方法

// io.netty.util.concurrent.SingleThreadEventExecutor#addTask
protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}
}
// io.netty.util.concurrent.SingleThreadEventExecutor#offerTask
final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);
}

(2)startThread方法

首先判断是否启动过了,保证EventLoop只有一个线程,如果没有启动过,尝试使用CAS将state状态改为ST_STARTED,然后调用doStartThread启动

// io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {// 启动doStartThread();} catch (Throwable cause) {// 异常回滚STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}
}
// io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {assert thread == null;// executor就是创建EventLoopGroup时创建的ThreadPerTaskExecutor类,将runnable包装秤Netty的FastThreadLocalThreadexecutor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();// 判断中断状态if (interrupted) {thread.interrupt();}boolean success = false;// 设置最后一次的执行时间updateLastExecutionTime();try {// this就是NioEventLoop,执行NioEventLoop的run方法,这个方法是个死循环,也是整个EventLoop的核心!SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 使用CAS不断修改state状态,改成ST_SHUTTING_DOWN for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {// cheanupcleanup();} finally {// 修改状态ST_TERMINATEDSTATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}// 回调terminationFuture方法terminationFuture.setSuccess(null);}}}}});
}

3、NioEventLoop的run方法(核心!)

该方法是一个死循环,也是整个NioEventLoop的核心!

从源码我们可以看出,run方法总共做了三件事:
(1)select获取感兴趣的事件。
(2)processSelectedKeys处理事件。
(3)runAllTasks执行队列中的任务。

// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:// selectselect(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and//    'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and//    'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {// 处理select keysprocessSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

(1)select

大致的逻辑就是:调用NIO的selector的select方法,默认阻塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上再加上0.5秒进行阻塞。当执行execute方法的时候,也就是添加任务的时候,会唤醒selector,防止selector阻塞时间过长。

// io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// 阻塞给定时间,默认一秒int selectedKeys = selector.select(timeoutMillis);selectCnt ++;// 如果有返回值||select被用户唤醒||任务队列有任务||有定时任务即将被执行,则跳出循环if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem.logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}

4、小结

每次执行 ececute 方法都是向队列中添加任务。当第一次添加时就启动线程,执行 run 方法,而 run 方法是整个 EventLoop 的核心,就像 EventLoop 的名字一样,Loop Loop ,不停的 Loop ,Loop 做什么呢?做3件事情。

  • 调用 selector 的 select 方法,默认阳塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上在加上0.5秒进行阻塞。当执行 execute 方法的时候,也就是添加任务的时候,唤醒 selecor,防止 selector 阻塞时间过大。
  • 当 selector 返回的时候,会调用 processSelectedKeys 方法对 selectKey 进行处理。
  • 当 processSelectedKeys 方法执行结束后,则按照 ioRatio 的比例执行 runAlITasks 方法,默认是 IO 任务时间和非IO 任务时间是相同的,你也可以根据你的应用特点进行调优 。比如 非IO任务比较多,那么你就将ioRatio 调小一点,这样非 IO 任务就能执行的长一点。防止队列积攒过多的任务。

此时,下图红圈部分源码我们分析完毕。
在这里插入图片描述


http://www.ppmy.cn/news/53570.html

相关文章

第三章 使用 Maven:命令行环境

第一节 实验一&#xff1a;根据坐标创建 Maven 工程 Maven 核心概念&#xff1a;坐标 ①数学中的坐标 使用 x、y、z 三个**『向量』作为空间的坐标系&#xff0c;可以在『空间』中唯一的定位到一个『点』**。 ②Maven中的坐标 [1]向量说明 使用三个**『向量』在『Maven的仓…

JUnit 5 参数化测试

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FutGAReQ-1682390296590)(https://p3-sign.toutiaoimg.com/tos-cn-i-qvj2lq49k0/76ce3a3756c54822ba10db2c9a0e94c9~noop.image?_iz58558&fromarticle.pc_detail&x-expires1682930831&x-s…

ZooKeeper分布式应用程序协调服务

目录 一.ZooKeeper基本介绍 1.ZooKeeper是什么&#xff1f; 2.ZooKeeper的工作机制 3.ZooKeeper的特点 4.ZooKeeper的数据结构 5.ZooKeeper的应用场景 5.1 统一命名服务 5.2 统一配置管理 5.3 统一集群管理 5.4 服务器动态上下线 5.5 软负载均衡 二.ZooKeeper的选举…

备战2个月,四轮面试拿下字节offer...

背景 菜 J 一枚&#xff0c;本硕都是计算机&#xff08;普通二本&#xff09;&#xff0c;2021 届应届硕士&#xff0c;软件测试方向。个人也比较喜欢看书&#xff0c;技术书之类的都有看&#xff0c;最后下面也会推荐一些经典书籍。 先说一下春招结果&#xff1a;拿下了四个…

[Golang从零到壹] 2.快速掌握基础语法

golang概述 golang官网 https://golang.google.cn/ API文档 官网页面 > Packages > 输入要搜索的函数或者包(例如fmt包) > Index栏中找 > Example查看代码示例 golang的特点 go语言一个文件要归属一个包 内存自动回收机制&#xff0c;不需要像C一样malloc之后还…

安科瑞充电方案解决电瓶车充电难、管理难、收费难问题

安科瑞 徐浩竣 江苏安科瑞电器制造有限公司 zx acrelxhj 0引言 电动自行车已经成为重要的出行工具&#xff0c;数量肯定还会继续增长&#xff0c;各级政府部门和物业管理者已经对其带来的消防隐患引起高度重视。安科瑞电动自行车运营管理云平台通过充电桩、云平台、APP小程…

MAC 查看已装载的卷宗

查看卷宗目录命令 ls /Volumes网络卷宗&#xff1a;本机、系统报告。 参考资料&#xff1a;https://blog.csdn.net/qq_41731201/article/details/125407204

Django项目异步改造--Celery

Celery 是一个简单&#xff0c;灵活且可靠的分布式系统&#xff0c;可以处理大量消息&#xff0c;同时为操作提供维护该系统所需的工具。这是一个任务队列&#xff0c;着重于实时处理&#xff0c;同时还支持任务调度。 Celery 通过消息进行通信&#xff0c;通常使用经纪人在 c…