Netty第二部

news/2024/11/30 14:37:13/

一、EventLoop和EventLoopGroup

一个Channel可以近似的理解成一个Socket的包装,EventLoop管理这些Channel的

1、EventLoop

EventLoop作为线程,具体Channel由EventLoop管理,在AbstractChannel类的register()方法可以体现

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;// 判断是否为当前线程if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 不是的话会交给eventLoop执行eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}

EventLoop类结构图

EventLoop对应的实现类

看看之前用到的NioEventLoop,在NioEventLoop类中定义如下两个属性,Java中线程和队列的组件就会想到线程池

private final Queue<Runnable> taskQueue;private volatile Thread thread;

2、EventLoopGroup

EventLoopGroup主要是每个新建的Channel分配一个EventLoop以及管理EventLoop

public interface EventLoopGroup extends EventExecutorGroup {/*** Return the next {@link EventLoop} to use*/@OverrideEventLoop next();/*** Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}* will get notified once the registration was complete.*/ChannelFuture register(Channel channel);/*** Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.*/ChannelFuture register(ChannelPromise promise);/*** Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}* will get notified once the registration was complete and also will get returned.** @deprecated Use {@link #register(ChannelPromise)} instead.*/@DeprecatedChannelFuture register(Channel channel, ChannelPromise promise);
}

线程的分配

服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。

异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来 支撑大量的Channel,而不是每个Channel分配一个Thread。EventLoopGroup负责为每个 新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方 式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。 一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的 Thread)。

线程管理

在内部,当提交任务到如果(当前)调用线程正是支撑EventLoop的线程,那么所提交 的代码块将会被(直接)执行。否则,EventLoop将调度该任务以便稍后执行,并将它放入 到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。

二、Channel接口

  1. Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作(bind、connect、read和 write),它大大地降低了直接使用 Socket 类的复杂性
  2. 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。

Channel 的生命周期状态

  • ChannelUnregistered :Channel 已经被创建,但还未注册到 EventLoop
  • ChannelRegistered :Channel 已经被注册到了 EventLoop
  • ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接 收和发送数据了
  • ChannelInactive :Channel 没有连接到远程节点

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。在我们的编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。

重要 Channel 的方法

  • eventLoop: 返回分配给 Channel 的 EventLoop
  • pipeline: 返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的 ChannelPipeline。
  • isActive: 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被 打开便是活动的。
  • localAddress: 返回本地的 SokcetAddress
  • remoteAddress: 返回远程的 SocketAddress
  • write: 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正 写往 socket。
  • flush: 将之前已写的数据冲刷到底层 socket 进行传输。
  • writeAndFlush: 一个简便的方法,等同于调用 write()并接着调用 flush()

三、ChannelHandlerContext、ChannelPipeline和ChannelHandler

ChannelPipeline接口

当Channel被创建时,它将会被自动地分配一个新的ChannelPipeline,每个Channel都有自己的ChannelPipeline

ChannelPipeline提供了ChannelHandler链的容器,并定义了用于在该链上传播入站(也就是从网络到业务处理)和出站(也就是从业务处理到网络),ChannelHandler都是放在ChannelPipeline中的

ChannelHandler 的生命周期

在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用下面这些方法。这些方法中的每一个都接受一个ChannelHandlerContext参数。

  • handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
  • handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用

ChannelPipeline中的ChannelHandler

入站(ChannelInboundHandler)和出站(ChannelOutboundHandler)ChannelHandler被安装到同一个 ChannelPipeline中,ChannelPipeline以双向链表的形式进行维护管理

如果此时有入站事件被触发,就会从ChannelPipeline头部开始流动,到达ChannelPipeline的尾部,中间只会经过定义入站的ChannelHandler;反之,出战就会从ChannelPipeline的尾部到头部,中间中间只会经过定义出站的ChannelHandler

Netty能区分入站事件的 Handler和出站事件的Handler,并确保数据只会在具有相同定向类型的两个ChannelHandler之间传递。

ChannelPipeline上的方法

既然 ChannelPipeline 以双向链表的形式进行维护管理 Handler,自然也提供了对应的方 法在 ChannelPipeline 中增加或者删除、替换 Handler。

  • addFirst、addBefore、addAfter、addLast 将一个 ChannelHandler添加到ChannelPipeline中
  • remove 将一个ChannelHandler从ChannelPipeline中移除
  • replace将ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler
  • get 通过类型或者名称返回ChannelHandler
  • context返回和ChannelHandler绑定的ChannelHandlerContext
  • names返回ChannelPipeline中所有ChannelHandler的名称
  • ChannelPipeline的API公开了用于调用入站和出站操作的附加方法。

ChannelHandlerContext

ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext

ChannelHandlerContext 的主要作用就和LinkedList 内部的类Node类似。

Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播

channel.write()、channelpipeline.write()会经过所有出站Handler

channelHandlerContext.write()只会经过下一个出站的Handler

ChannelHandlerContext 的 API

alloc 返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator

bind 绑定到给定的 SocketAddress,并返回 ChannelFuture

channel 返回绑定到这个实例的 Channel

close 关闭 Channel,并返回 ChannelFuture

connect 连接给定的 SocketAddress,并返回 ChannelFuture

deregister 从之前分配的 EventExecutor 注销,并返回 ChannelFuture

disconnect 从远程节点断开,并返回 ChannelFuture executor 返回调度事件的 EventExecutor

fireChannelActive 触发对下一个 ChannelInboundHandler 上的

channelActive()方法(已 连接)的调用

fireChannelInactive 触发对下一个 ChannelInboundHandler 上的channelInactive()方法 (已关闭)的调用

fireChannelRead 触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接 收的消息)的调用

fireChannelReadComplete 触发对下一个 ChannelInboundHandler 上的 channelReadComplete()方法的调用

fireChannelRegistered 触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered()方法的调用

fireChannelUnregistered 触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered()方法的调用

fireChannelWritabilityChanged 触发对下一个 ChannelInboundHandler 上的 fireChannelWritabilityChanged()方法的调用

fireExceptionCaught 触发对下一个 ChannelInboundHandler 上的 fireExceptionCaught(Throwable)方法的调用

fireUserEventTriggered 触发对下一个 ChannelInboundHandler 上的 fireUserEventTriggered(Object evt)方法的调用

handler 返回绑定到这个实例的 ChannelHandler

isRemoved 如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true

name 返回这个实例的唯一名称

pipeline 返回这个实例所关联的 ChannelPipeline

read 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的 channelReadComplete(ctx)方法

write 通过这个实例写入消息并经过 ChannelPipeline

writeAndFlush 通过这个实例写入并冲刷消息并经过 ChannelPipeline

当使用 ChannelHandlerContext 的 API 的时候,有以下两点:

  1. ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的, 所以缓存对它的引用是安全的;
  2. 相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该 尽可能地利用这个特性来获得最大的性能

入站的handler时经过会读到一个buffer中,如果中间不释放会造成内存泄漏;正常情况netty会处理,但是如果handler执行异常这个buffer就永远不会被netty处理,可以实现SimpleChannelInboundHandler接口重写channelRead0()方法,在此方法处理具体业务逻辑


http://www.ppmy.cn/news/1199478.html

相关文章

pcigo图床插件的简单开发

1.前言&#xff1a; 如果想写一个图床并且投入使用&#xff0c;那么&#xff0c;接入picgo一定是一个不错的选择。picgo有着windows&#xff0c;mac&#xff0c;linux等多个客户端版本。实用且方便。 2. 开发的准备&#xff1a; 2.0. 需要安装一个node node这里我就不详细说…

STL(第八课):map

STL map是C标准库中的容器&#xff0c;它是一种关联容器&#xff0c;也就是说&#xff0c;它的元素是按照键值来存储的。STL map以键值对的形式来存储数据&#xff0c;每个键对应一个值。map中的每个元素都是一个pair对象&#xff0c;pair第一个元素为键&#xff0c;第二个元素…

解决gtihub访问不到的

解决gtihub访问不到的 小编一开始也是找不到git但是通过查询资料&#xff0c;最终也是找到了解决方式 据说git的ip地址通常会变化的&#xff0c;可以通过地址查询网站查询到git当前的ip https://sites.ipaddress.com/github.com/在输入框中github.com&#xff0c;然后搜索 在…

如何再kali中下载iwebsec靶场

这个靶场有三种搭建方法&#xff1a; 第一种是在线靶场&#xff1a;http://www.iwebsec.com:81/ 第二种是虚拟机版本的&#xff0c;直接下载到本地搭建 官网地址下载&#xff1a;http://www.iwebsec.com/ 而第三种就是利用docker搭建这个靶场&#xff0c;我这里是用kali进行…

【Head First 设计模式】-- 观察者模式

背景 客户有一个WeatherData对象&#xff0c;负责追踪温度、湿度和气压等数据。现在客户给我们提了个需求&#xff0c;让我们利用WeatherData对象取得数据&#xff0c;并更新三个布告板&#xff1a;目前状况、气象统计和天气预报。 WeatherData对象提供了4个接口&#xff1a; …

vivado 布线分析

在“ Device ”窗口中开启“ Routing Resources ” &#xff08; 布线资源 &#xff09; 即可查看具体的布线资源。 缩小时显示抽象视图。抽象视图 &#xff1a; • 精简穿过器件的布线。 • 根据穿过特定区域的布线数量显示不同粗细的线条。 类似地 &#xff0c; 布局以块…

【flask跨域问题】解决它

大概7-8年前&#xff0c;前后端还没开始分离或者刚开始分离的之前&#xff0c;跨域问题很多。 后来我就没在遇到过了&#xff0c;这次做一个小项目&#xff0c;又遇到了&#xff0c;记录下。 现在前端的脚手架都自己能解决了。 1. 跨域 是因为出于浏览器的同源策略限制。同源…

JS+CSS随机点名详细介绍复制可用(可自己添加人名)

想必大家也想拥有一个可以随机点名的网页&#xff0c;接下来我为大家介绍一下随机点名&#xff0c;可用于抽人&#xff0c;哈哈 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><style>* {margin: 0;…