Netty 之 NioEventLoop 源码阅读

news/2024/9/23 4:22:13/

文章目录

  • 1. 概述
  • 2. EventExecutorGroup实例
  • 3. 源码
    • 3.1 构造函数
      • 3.1.1 DefaultEventLoopGroup 构造函数
      • 3.1.2 NioEventLoop 构造函数
    • 3.2 run 方法
      • 3.2.1 SelectStrategy
      • 3.2.2 select
      • 3.2.3 processSelectedKeys
        • 3.2.3.1 processSelectedKeysOptimized
        • 3.2.3.2 processSelectedKeysPlain
        • 3.2.3.3 processSelectedKey
        • 3.2.3.4 processSelectedKey NioTask
    • 3.3 execute
    • 3.4 schedule
    • 3.5 NioEventLoop register
    • 3.6 runAllTasks
    • 3.7 wakenUp

1. 概述

EventExecutorGroup 继承了 ScheduledExecutorService、AbstractExecutorService、Iterable;
根据这些名字,大概知道可以提供这些功能 :

  • 定时调度线程池定时执行任务
  • 线程池异步提交执行任务
  • 迭代器

NioEventLoop 继承了 SingleThreadEventLoop,继承了SingleThreadEventExecutor, 是一个单线程事件处理器,单线程事件调度线程池;常用方法:

  • NioEventLoop#run 接受处理selector 上的I/O事件,处理普通任务、定时任务
  • 注册channel:io.netty.channel.SingleThreadEventLoop#register
  • 执行任务:io.netty.util.concurrent.SingleThreadEventExecutor#execute
  • 每个EventLoop都有自己的Selector: private Selector selector; private Selector unwrappedSelector;

image.png

2. EventExecutorGroup实例

java">import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;import java.util.concurrent.TimeUnit;public class EventLoopGroupStudy {public static void main(String[] args) {DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());DefaultEventLoopGroup group1 = new DefaultEventLoopGroup(2);for (EventExecutor eventLoop : group1) {System.out.println(eventLoop);}NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);nioWorkers.execute(()->{System.out.println("normal task...");});nioWorkers.scheduleAtFixedRate(() -> {System.out.println("scheduleAtFixedRate running...");}, 0, 1, TimeUnit.SECONDS);}
}

运行截图:
image.png

在这里插入图片描述

3. 源码

3.1 构造函数

3.1.1 DefaultEventLoopGroup 构造函数

  • 可以指定 线程数量 nThreads、线程工厂 threadFactory
  • children = new EventExecutor[nThreads]; children[i] = newChild(executor, args); -> new NioEventLoop
  • 没有指定 线程数量 nThreads,默认 NettyRuntime.availableProcessors() * 2 ,cpu核心乘2的线程数,或者 io.netty.eventLoopThreads 配置的数量,最小是1
java">public DefaultEventLoopGroup(int nThreads) {this(nThreads, (ThreadFactory) null);
}public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {super(nThreads, threadFactory);
}//  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
//			"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

3.1.2 NioEventLoop 构造函数

  • newTaskQueue 创建任务队列 Math.max(16, SystemPropertyUtil.getInt(“io.netty.eventLoop.maxPendingTasks”, Integer.MAX_VALUE));
  • 创建 NioEventLoop 创建对应的 selector: openSelector()
  • args 从创建 MultithreadEventExecutorGroup 的时候带进来
java">@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;// 创建 selectorfinal SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;
}

3.2 run 方法

  • 轮询 I/O 事件: select(wakenUp.getAndSet(false));

轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件

  • 处理 I/O 事件: processSelectedKeys(); 处理已经准备就绪的 I/O 事件;
  • 处理完 I/O 事件,再处理异步任务队列: runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。

在这里插入图片描述

java">@Override
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:// 轮询 I/O 事件: select(wakenUp.getAndSet(false));// 轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {// 处理 I/O 事件: processSelectedKeys(); 处理已经准备就绪的 I/O 事件;processSelectedKeys();} finally {runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {final long ioTime = System.nanoTime() - ioStartTime;// 处理完 I/O 事件,再处理异步任务队列: runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}

3.2.1 SelectStrategy

SelectStrategy 定义:
int SELECT = -1;
int CONTINUE = -2;
int BUSY_WAIT = -3;

  • 没有任务返回 SelectStrategy.SELECT
  • selectNowSupplier selectNow() 异常返回-1 也是 SelectStrategy.SELECT
  • selectNowSupplier selectNow() 异常返回0 或者 数量,走到 default
java">switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:
}public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}private final IntSupplier selectNowSupplier = new IntSupplier() {@Overridepublic int get() throws Exception {return selectNow();}
};

3.2.2 select

  • delayNanos(currentTimeNanos) 没有调度任务返回1s,有调度任务返回调度任务的待执行时间
    • 0.5ms内有定时任务需要执行,退出无限循环 : long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    • Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。
  • 有待执行任务,wakenUp设置为true,selectNow然后结束
    • hasTasks() && wakenUp.compareAndSet(false, true)
  • 阻塞等待获取I/O事件
    • int selectedKeys = selector.select(timeoutMillis);
    • select次数 selectCnt++
  • selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks() 都停止 select
    • selectedKeys != 0:有io事件待处理
    • 参数oldWakenUp为true
    • wakenUp为true
    • hasTasks 有待执行任务
    • hasScheduledTasks 有待调度执行的任务
  • Thread.interrupted(), 线程被中断,break
  • selector.select(timeoutMillis) 真正等待时间 + 程序运行几毫秒 >= timeoutMillis 预计等待时间,说明selector没有问题,所以selectCnt重置为1
    • time [当前时间 = 开始时间 + 真正等待时间 + 程序运行几毫秒(忽略)] - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) [预计等待时间] >= currentTimeNanos [开始时间]

==> 真正等待时间 + 程序运行事件几毫秒(忽略) >= 预计等待时间

  • selectCnt 超过一定次数,可能触发了 epoll 空轮询 Bug,重新构建 selector
    • SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD
    • 重新构建 selector,selector = selectRebuildSelector(selectCnt);
java">    private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();// delayNanos(currentTimeNanos) 没有调度任务返回1s,有调度任务返回调度任务的待执行时间// Netty 的任务队列包括普通任务、定时任务以及尾部任务,hasTask() 判断的是普通任务队列和尾部队列是否为空,// 而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {// 0.5ms内有定时任务需要执行,退出无限循环long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 2. 有待执行任务,wakenUp设置为true,selectNow结束if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}// 3. 阻塞等待获取I/O事件int selectedKeys = selector.select(timeoutMillis);// select次数selectCnt ++;// selectedKeys != 0:有i/o事件待处理// select方法调用前wakenUp为true:selector 被唤醒// wakenUp为true:selector 被唤醒// hasTasks:有待执行任务// hasScheduledTasks:有待调度执行的任务if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}// 线程被中断if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 解决 JDK epoll 空轮训bug,selectCnt > SELECTOR_AUTO_REBUILD_THRESHOLD, 重新构建Selector// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}}

3.2.3 processSelectedKeys

处理已经就绪的 SelectionKey
Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型,而正常逻辑使用的是 JDK HashSet 类型

java">    private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}

3.2.3.1 processSelectedKeysOptimized

用的 SelectedSelectionKeySet selectedKeys
SelectedSelectionKeySet 内部使用的是 SelectionKey 数组,所以 processSelectedKeysOptimized 可以直接通过遍历数组取出 I/O 事件,相比 JDK HashSet 的遍历效率更高

java">private SelectedSelectionKeySet selectedKeys;private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {// I/O事件由 Netty 负责处理processSelectedKey(k, (AbstractNioChannel) a);} else {// 用户自定义任务@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}
}

3.2.3.2 processSelectedKeysPlain
  • 处理I/O事件
    • processSelectedKey(k, (AbstractNioChannel) a);
  • 处理NioTask
    • NioTask task = (NioTask) a;
    • processSelectedKey(k, task);
java">private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See https://github.com/netty/netty/issues/597if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();// Create the iterator again to avoid ConcurrentModificationExceptionif (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}
}
3.2.3.3 processSelectedKey

NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, AbstractNioChannel)

  • k.isValid() 检查key是否有效
  • ((readyOps & SelectionKey.OP_CONNECT) != 0) :unsafe.finishConnect(); 接受连接
  • ((readyOps & SelectionKey.OP_WRITE) != 0): ch.unsafe().forceFlush(); 发送数据到客户端
  • ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0):unsafe.read(); 读取客户端数据
java">    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

3.2.3.4 processSelectedKey NioTask

NioTask 是用户自定义的 task

java">    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {int state = 0;try {task.channelReady(k.channel(), k);state = 1;} catch (Exception e) {k.cancel();invokeChannelUnregistered(task, k, e);state = 2;} finally {switch (state) {case 0:k.cancel();invokeChannelUnregistered(task, k, null);break;case 1:if (!k.isValid()) { // Cancelled by channelReady()invokeChannelUnregistered(task, k, null);}break;}}}

3.3 execute

  • addTask(task); 添加任务到 mpsc无锁队列
  • inEventLoop 如果为 false 表示由其它线程来调用 execute,启动线程,STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED),CAS保证不会重复启动
  • wakeup 唤醒 selector 的 select 阻塞
java">    public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);}
java">private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}

io.netty.channel.nio.NioEventLoop#wakeup

java">  protected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {selector.wakeup();}}

3.4 schedule

AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask)
scheduledTaskQueue

java">    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task);}});}return task;}PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {if (scheduledTaskQueue == null) {scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(SCHEDULED_FUTURE_TASK_COMPARATOR,// Use same initial capacity as java.util.PriorityQueue11);}return scheduledTaskQueue;}

3.5 NioEventLoop register

  • io.netty.channel.nio.NioEventLoop#register
  • io.netty.channel.nio.NioEventLoop#register0
  • java.nio.channels.spi.AbstractSelectableChannel#register 调用nio进行注册
java">  public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {if (ch == null) {throw new NullPointerException("ch");}if (interestOps == 0) {throw new IllegalArgumentException("interestOps must be non-zero.");}if ((interestOps & ~ch.validOps()) != 0) {throw new IllegalArgumentException("invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');}if (task == null) {throw new NullPointerException("task");}if (isShutdown()) {throw new IllegalStateException("event loop shut down");}if (inEventLoop()) {register0(ch, interestOps, task);} else {try {// Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register// may block for a long time while trying to obtain an internal lock that may be hold while selecting.submit(new Runnable() {@Overridepublic void run() {register0(ch, interestOps, task);}}).sync();} catch (InterruptedException ignore) {// Even if interrupted we did schedule it so just mark the Thread as interrupted.Thread.currentThread().interrupt();}}}
java">    private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {try {ch.register(unwrappedSelector, interestOps, task);} catch (Exception e) {throw new EventLoopException("failed to register a channel", e);}}
java">    public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException{synchronized (regLock) {if (!isOpen())throw new ClosedChannelException();if ((ops & ~validOps()) != 0)throw new IllegalArgumentException();if (blocking)throw new IllegalBlockingModeException();SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}}

3.6 runAllTasks

  • fetchFromScheduledTaskQueue : scheduledTaskQueue 移动到 taskQueue
  • pollTask() : taskQueue 取出任务
  • afterRunningAllTasks : taskQueue 为空,执行 tailTasks
  • safeExecute:try catch 的 执行 task,有异常不会终止
java">runAllTasks(ioTime * (100 - ioRatio) / ioRatio);protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task);runTasks ++;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}

3.7 wakenUp

java">private final AtomicBoolean wakenUp = new AtomicBoolean();


进入 select ,设置 wakenUp 为 false

java">case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));


io.netty.channel.nio.NioEventLoop#wakeup

java">protected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {selector.wakeup(); }
}

io.netty.util.concurrent.SingleThreadEventExecutor#execute
提交任务会 wakeup(inEventLoop);

java">public void execute(Runnable task) {if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop); }
}

http://www.ppmy.cn/news/1509125.html

相关文章

华为设备支持的逻辑接口类型及逻辑接口配置

逻辑接口简介 逻辑接口是指能够实现数据交换功能但物理上不存在、需要通过配置建立的虚拟接口。 本节主要介绍设备支持的几种类型的逻辑接口。 - Eth-Trunk 接口 具有二层特性和三层特性的逻辑接口&#xff0c;把多个以太网接口在逻辑上等同于一个逻辑接口&#xff0c;比以…

Python 之Scikit-learn(七) -- Scikit-learn 中的不同度量指标详细介绍

Scikit-learn 提供了多种度量指标&#xff08;metrics&#xff09;来评估分类、回归、聚类等机器学习任务的性能。这些度量指标有助于判断模型的表现和优化模型参数。下面将详细介绍一些常用的度量指标及其适用情况。 1、分类任务的度量指标 准确率&#xff08;Accuracy&…

Elasticsearch 文档操作:Spring Boot 集成实践

Elasticsearch&#xff08;简称 ES&#xff09;是一个强大的搜索引擎&#xff0c;它提供了丰富的文档操作功能&#xff0c;包括索引、查询、更新和删除等。这些操作是 ES 的核心功能&#xff0c;对于实现高效的数据检索和分析至关重要。本文将通过一个 Spring Boot 应用中的示例…

【C#】中IndexOf的用法

在 C# 中&#xff0c;IndexOf 方法是字符串和列表&#xff08;如 List<T>&#xff09;等数据结构中常用的方法&#xff0c;用于查找指定元素或子串首次出现的位置。以下是针对不同情况使用 IndexOf 的示例。 对于字符串 对于字符串类型&#xff0c;IndexOf 方法返回子字…

超详细!!!electron-vite-vue开发桌面应用之开启调试工具(二)

云风网 云风笔记 云风知识库 上篇已经初步搭建完项目&#xff0c;这次配置比较重要的一部分&#xff0c;那就是开启调试工具&#xff0c;这是开发项目比较重要且基础的部分 vite.config.ts配置更新 main: {// Shortcut of build.lib.entry.entry: electron/main.ts,onstart(ar…

unity自动添加头部注释脚本

unity自动添加头部注释脚本&#xff0c;放在Assets目录自动生效 public class ScriptCreateInit : UnityEditor.AssetModificationProcessor {private static void OnWillCreateAsset(string path){path path.Replace(".meta", "");if (path.EndsWith(&qu…

每天一个数据分析题(四百七十一)- 假设检验

下列对假设检验的描述合理的是? A. 备择假设是研究者想收集证据予以支持的假设 B. 原假设是研究者想收集证据予以推翻的假设 C. 原假设是研究者想收集证据予以支持的假设 D. 备择假设是研究者想收集证据予以推翻的假设 数据分析认证考试介绍&#xff1a;点击进入 题目来…

理解 Go 语言的分组操作

共享资源保护、任务编排和消息传递是 Go 并发编程中常见的场景,而分组执行一批相同的或类似的任务则是任务编排中的一类情形。下面我们专门介绍分组编排的一些常用场景和同步原语,主要用来处理一组任务。我们先来介绍一个非常常用的同步原语,即 ErrGroup。 1. ErrGroup ErrG…