Netty中NioEventLoop介绍

news/2025/1/16 10:02:10/

一、Netty基本介绍

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

        Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

        本文主要介绍Netty中的核心类之一的:NioEventLoop类。

二、NioEventLoop继承体系

 三、EventLoop相关接口

        我们先看右边的接口, 部分接口我们在上一篇以及介绍过了,可以看Netty中NioEventLoopGroup介绍,我们从OrderedEventExecutor开始往下看。

一、OrderedEventExecutor

public interface OrderedEventExecutor extends EventExecutor {
}

         OrderedEventExecutor继承了EventExecutor,即拥有线程相关的操作声明。

二、EventLoop

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {EventLoopGroup parent();
}

        EventLoop继承了OrderedEventExecutor和EventLoopGroup接口,即拥有线程相关操作即事件循环组的相关行为声明。但并未声明新的接口。

四、EventLoop相关实现

        在介绍EventLoop的实现之前,我们需要了解一些内容:

  1. EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
  2. EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
  3. Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
  4. 一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

         上一篇EventLoopGroup的介绍里写到了MultithreadEventExecutorGroup的初始化和创建EventExecutor

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}// 初始化线程组children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {// 将线程和传入的executor做一个绑定// 注意:这里线程组每个元素都绑定了同一个executor// newChild是一个抽象方法,依赖子类实现children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {// 失败执行策略if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}// 初始化一个EventExecutor选择工厂,轮询获取EventExecutor,chooserFactory的默认实现是DefaultEventExecutorChooserFactory// next()方法依赖chooser实现chooser = chooserFactory.newChooser(children);// 声明线程终止的监听器final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};// 将监听器绑定到线程组的每个线程中for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}// 初始化线程集合(只读)Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

        创建EventExecutor

// 创建EventLoop对象,并绑定executor
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

         我们在这里接着往下跟NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {// 初始化super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;// 创建一个selector的二元组final SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;
}

        进入super方法

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);// 创建收尾队列tailTasks = newTaskQueue(maxPendingTasks);
}

        接着进入super

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);// 初始化子线程this.executor = ThreadExecutorMap.apply(executor, this);taskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

        进入ThreadExecutorMap.apply方法

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(executor, "executor");ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");return new Executor() {@Overridepublic void execute(final Runnable command) {// 这里调用了NioEventLoopGroup所包含的executor的execute()executor.execute(apply(command, eventExecutor));}};
}

        进入apply方法

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(command, "command");ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");// 这里包装了一个runnable,记录当前执行线程,并在执行完成后删除return new Runnable() {@Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};
}

        我们再回去看看NioEventLoop的openSelector()方法,我们先了解下SelectorTuple类

private static final class SelectorTuple {final Selector unwrappedSelector;final Selector selector;SelectorTuple(Selector unwrappedSelector) {this.unwrappedSelector = unwrappedSelector;this.selector = unwrappedSelector;}SelectorTuple(Selector unwrappedSelector, Selector selector) {this.unwrappedSelector = unwrappedSelector;this.selector = selector;}
}

        SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector() 方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:

private SelectorTuple openSelector() {final Selector unwrappedSelector;try {// 根据provider创建出个NIo的原生selectorunwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}// 若禁用了keyset优化功能,则直接返回NIo原生的selector,优化就是将selector中的三个set集合变为三个数组// 因为数组是顺序存放的,要比随机存放的集合执行效率高if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);}// 此处优化逻辑省略...
}

        NioEventLoop的父类是一个Executor,所以我们在看看execute()方法:

@Override
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判断当前线程是不是EventLoop中成员变量的executor线程boolean inEventLoop = inEventLoop();// 将任务添加到队列addTask(task);if (!inEventLoop) {// 启动线程(成员变量中的execute)startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}
}

总结:

        NioEventLoopGroup创建CUP核心数两倍的EventLoop数组,NioEventLoopGroup内部还包含了一个Executor成员变量。

        随后对EventLoop数组进行初始化,传入NioEventLoopGroup的executor成员变量,EventLoop内部也有一个executor成员变量,EventLoop对内部的executor变量进行初始化,并在其executor的execute()方法调用NioEventLoopGroup的成员变量executor的execute()方法。

        也就是说EventLoop的executor调用execute()方法的时候,会调用NioEventLoopGroup的Executor的execute方法来执行具体的操作。

        


http://www.ppmy.cn/news/145233.html

相关文章

Seaborn.load_dataset()加载数据集失败最佳解决方法

load_dataset() 是 Seaborn 库中提供的一个函数&#xff0c;用于加载一些原始数据集。这些数据集包含了许多经典的数据集&#xff0c;比如鸢尾花数据集、小费数据集等&#xff0c;这些数据集在数据可视化和机器学习中非常常见。 使用 load_dataset() 函数可以方便地获取这些数…

Http\Rpc\Rmi

目录 Http Rpc Rmi Http HTTP协议&#xff1a; 目的&#xff1a;HTTP&#xff08;超文本传输协议&#xff09;协议是用于在客户端和服务器之间传输超文本和其他数据的协议&#xff0c;是Web应用程序的基础。通信方式&#xff1a;HTTP协议使用TCP/IP协议作为传输协议&#…

thinkpad笔记本摄像头灯亮着,但无图像显示

本人今天发现自己的笔记本的摄像头打开了无法使用&#xff0c;但绿灯亮着 1 在设备管理器中检查驱动有没有被禁用&#xff0c;如被禁用请启用 2 点击本子的电源图标&#xff0c;在弹出的窗口上找到摄像头图标&#xff0c;去掉其反斜框 3 再打开摄像头就可以使用了

笔记本电脑摄像头黑屏怎么处理?

最近发现摄像头的面部识别不能用&#xff0c;我还以为是自己最近长胖了&#xff0c;头型变了导致的。 今天突然打开电脑的相机一用&#xff0c;发现摄像头的小白灯是点亮的&#xff0c;但是画面是黑屏。 我去网上搜了一大堆的资料说怎么处理&#xff0c;有的说驱动&#xff0…

开源:视频语音 实时传输 网络版 支持所有带摄像头的windows 电脑、笔记本、各种外置摄像头

视频语音 实时传输 网络版 支持所有带摄像头的windows 电脑、笔记本、各种外置摄像头 下载地址1&#xff1a;http://pan.baidu.com/share/link?shareid147383&uk201606611 下载地址2&#xff1a;http://bbs.bwsyq.com/Handler/DownAttachment.aspx?AttachId15&siteId…

python笔记本自带摄像头opencv实时人脸检测与识别

#-*- coding: utf-8 -*- # import 进openCV的库 import cv2###调用电脑摄像头检测人脸并截图def CatchPICFromVideo(window_name, camera_idx, catch_pic_num, path_name):cv2.namedWindow(window_name)#视频来源&#xff0c;可以来自一段已存好的视频&#xff0c;也可以直接来…

语音识别模块

语音识别模块&#xff0c;是一种嵌入式语言识别模块&#xff0c;是将人类语音中的词语转换成计算机可读&#xff0c;与主芯片进行通讯&#xff0c;语音模块嵌入到智能化产品中&#xff0c;可以实现人机语音交互。 在炎热的夏季空调是不可少的&#xff0c;但在使用中我们可能会遇…

windows自带语音识别

private DictationRecognizer dictationRecognizer;void Awake(){dictationRecognizer new DictationRecognizer(DictationTopicConstraint.Dictation);//订阅事件 dictationRecognizer.DictationResult ListenSoundResult;dictationRecognizer.DictationComplete ListenSo…