netty编程之对reactor的应用

embedded/2024/10/20 20:31:45/

写在前面

netty使用了reactor的线程模型(或者叫做工作模式)。本文就一起来看下其是如何使用的。

1:不同的rector对应的不同的编码方式

首先是rector的单线程模型,对应到netty中的编码方式如下:

// 这里的1,就是rector的单线程模型中一个线程的"1"
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);
// 定义启动类,并将rector模型设置到启动类中
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventExecutors);

非主从多线程版本:

// 这里不设置线程数,线程数会自动根据核数指定(一般多核,所以肯定大于1),就是rector的多线程模型了,当然你可以显式指定线程数量
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
// 定义启动类,并将rector模型设置到启动类中
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventExecutors);

主从多线程版本:

// reactor主从模式中的主
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// reactor主从模式中的从
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 定义启动类,并将rector模型设置到启动类中
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);

nettyreactor_29">2:nettyreactor主从模式支持源码分析

netty想要支持reactor,需要做的工作其实就是将主reactor绑定到ServerSocketChannel,将从reactor绑定到SocketChannel中就行了,即让主reactor负责基于ServerSocketChannel的接收连接的工作,而让从reactor负责基于SocketChannel的数据读写工作。
debug使用的代码:

// reactor主从模式中的主
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// reactor主从模式中的从
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 定义启动类,并将rector模型设置到启动类中
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);

serverBootstrap.group(bossGroup, workerGroup);中完成绑定的工作。

reactorServerSocketChannel_42">2.1:主reactor绑定到ServerSocketChannel

// io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {// 调用父类group方法super.group(parentGroup);...return this;
}
// io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)
public B group(EventLoopGroup group) {...// volatile EventLoopGroup group; 设置到局部变量this.group = group;return self();
}
io.netty.bootstrap.AbstractBootstrap#group()
// 该方法负责读取到上一步设置的局部变量
public final EventLoopGroup group() {return group;
}
// io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {...ChannelFuture regFuture = config().group().register(channel);...return regFuture;
}

ChannelFuture regFuture = config().group().register(channel);这里的channel类是netty的channel底层就是Java nio的ServerSocketChannel了,通过register方法也就完成了绑定。

reactorSocketChannel_78">2.2:从reactor绑定到SocketChannel

reactor绑定到SocketChannel需要依赖于ServerSocketChannel,因为SocketChannel的创建是由ServerSocketChannel来完成的,首先看代码:

// io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {...// 赋值从reactor到childGroupthis.childGroup = childGroup;return this;
}
// io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {...// 又换了个名字!final EventLoopGroup currentChildGroup = childGroup;p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {...ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 这里创建ServerBootstrapAcceptor,将从reactor作为参数传了进去pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});
}
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里的msg就是socketchannel,所以这里可以强转final Channel child = (Channel) msg;childGroup.register(child).addListener(new ChannelFutureListener() {...
}

childGroup.register(child)这里就完成绑定了,需要注意,只有在第一次读取客户端数据时才会执行到这里。

reactor_119">3:main reactor为什么只会用到线程组中的一个线程

前面我们分析了main rector是如何帮i当道ServerSocketChannel中的,在如下位置:
在这里插入图片描述
可以看到这个绑定是在bind端口时,即代码b.bind(PORT).sync();执行的,因为只会绑定到一个端口,所以使用一个线程也就足够了,多了也没有什么意义。所以只会用到线程组中一个线程的原因是只会绑定一个端口号。所以啊,就要从这一组线程中选出一个线程来,如何选的呢?接着绑定逻辑来看源码:

// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {return next().register(channel);
}

这里的next方法就是来完成选择工作的:

// io.netty.util.concurrent.MultithreadEventExecutorGroup#next
public EventExecutor next() {// private final EventExecutorChooserFactory.EventExecutorChooser chooserreturn chooser.next();
}

choose有两个实现类:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {// 这种与的方式选择一个效率要由于取余的方式,一般我们都是采用取余的方式,但netty追求极致性能// 但要求executors总数是2的次幂(另外要注意,-的优先级高于&,我觉得写成这样子更清晰)// executors[idx.getAndIncrement() & (executors.length - 1)]return executors[idx.getAndIncrement() & executors.length - 1];}
}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {// 取余选择,效率就要低于位运算的方式了return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
}

PowerOfTwoEventExecutorChooser采用位运算方式选出一个来效率更高,计算如下:

executors总数4(注意-的优先级高于&)
0 & 4 - 1 = 00000000 & 00000011 = 0(十进制)
1 & 4 - 1 = 00000001 & 00000011 = 1(十进制)
2 & 4 - 1 = 00000010 & 00000011 = 2(十进制)
3 & 4 - 1 = 00000011 & 00000011 = 3(十进制)
4 & 4 - 1 = 00000100 & 00000011 = 0(十进制)
循环了。。。

那么,直接让用户设置一个不就好了吗?为什么还要设置线程组呢?我想这是因为netty为了降低编码的复杂度,从而使得主reactor和从reactor使用相同的编码方式,而底层的差异性就由netty来解决了,所以不得不说netty是一个很优秀的框架啊!

写在后面

参考文章列表

什么是reactor以及其三种版本 。


http://www.ppmy.cn/embedded/129077.html

相关文章

Java基础08-集合框架—单列集合

一、集合框架 二、集合框架—单列集合 1、Collection 集合体系 Collection是单列集合的祖宗&#xff0c;它规定的方法(功能)是全部单列集合都会继承的。 Collection集合特点&#xff1a; List系列集合&#xff1a;添加的元素是有序、可重复、有索引。 ArrayList、LinekdList &…

Javascript算法——双指针法移除元素、数组去重、比较含退格字符、有序数组平方

数组移除元素&#xff08;保证数组仍连续&#xff09; 暴力求解法&#xff08;两层for循环&#xff09;,length单词拼写错误❌二次嵌套for的length设置 /*** param {number[]} nums* param {number} val* return {number}*/ var removeElement function(nums, val) {let leng…

动态规划之打家劫舍

大纲 题目思路第一步&#xff1a;确定下标含义第二步&#xff1a;确定递推公式第二步&#xff1a;dp数组如何初始化第三步&#xff1a;确定遍历顺序第四步&#xff1a;举例推导dp数组 总结 最近有人询问我 LeetCode 「打家劫舍」系列问题&#xff08;英文版叫 House Robber&…

mysql多表关系与查询

一、多表关系 1.多表操作分类 1.1 多对一 关系举例&#xff1a; 多对一&#xff1a;多名学生在一个班级里 一对多主要是靠 “外键” 实现。在 “多” 的表中建立外键&#xff0c;指向 "一"的主键 一对多的关系在实际生产当中使用非常常见。 一对多的核心解决方案是…

使用React Router实现前端的权限访问控制

前段时间学习了React Router&#xff0c;发现没有Vue里面的路由功能强大&#xff0c;没有直接提供路由中间件&#xff0c;不能像Vue里面一样在路由配置上设置任意的额外属性&#xff0c;但是可以通过一些技巧来实现这些功能。 1、配置菜单 后台管理系统一般都会在左侧显示菜单…

SQL第19课——使用存储过程

介绍什么是存储过程&#xff1f;为什么要使用存储过程&#xff1f;如何使用存储过程&#xff1f;创建和使用存储过程的基本语法&#xff1f; 19.1 存储过程 到目前为止&#xff0c;使用的大多数SQL语句都是针对一个或多个表的单条语句。 对于一些复杂的操作需要多条语句才能…

超GPT3.5性能,无限长文本,超强RAG三件套,MiniCPM3-4B模型分享

MiniCPM3-4B是由面壁智能与清华大学自然语言处理实验室合作开发的一款高性能端侧AI模型&#xff0c;它是MiniCPM系列的第三代产品&#xff0c;具有4亿参数量。 MiniCPM3-4B模型在性能上超过了Phi-3.5-mini-Instruct和GPT-3.5-Turbo-0125&#xff0c;并且与多款70亿至90亿参数的…

爬虫逆向学习(十二):一个案例入门补环境

此分享只用于学习用途&#xff0c;不作商业用途&#xff0c;若有冒犯&#xff0c;请联系处理 反爬前置信息 站点&#xff1a;aHR0cDovLzEyMC4yMTEuMTExLjIwNjo4MDkwL3hqendkdC94anp3ZHQvcGFnZXMvaW5mby9wb2xpY3k 接口&#xff1a;/xjzwdt/rest/xmzInfoDeliveryRest/getInfoDe…