一、EventLoop和EventLoopGroup
一个Channel可以近似的理解成一个Socket的包装,EventLoop管理这些Channel的
1、EventLoop
EventLoop作为线程,具体Channel由EventLoop管理,在AbstractChannel类的register()方法可以体现
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;// 判断是否为当前线程if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 不是的话会交给eventLoop执行eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}
EventLoop类结构图
EventLoop对应的实现类
看看之前用到的NioEventLoop,在NioEventLoop类中定义如下两个属性,Java中线程和队列的组件就会想到线程池
private final Queue<Runnable> taskQueue;private volatile Thread thread;
2、EventLoopGroup
EventLoopGroup主要是每个新建的Channel分配一个EventLoop以及管理EventLoop
public interface EventLoopGroup extends EventExecutorGroup {/*** Return the next {@link EventLoop} to use*/@OverrideEventLoop next();/*** Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}* will get notified once the registration was complete.*/ChannelFuture register(Channel channel);/*** Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.*/ChannelFuture register(ChannelPromise promise);/*** Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}* will get notified once the registration was complete and also will get returned.** @deprecated Use {@link #register(ChannelPromise)} instead.*/@DeprecatedChannelFuture register(Channel channel, ChannelPromise promise);
}
线程的分配
服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。
异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来 支撑大量的Channel,而不是每个Channel分配一个Thread。EventLoopGroup负责为每个 新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方 式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。 一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的 Thread)。
线程管理
在内部,当提交任务到如果(当前)调用线程正是支撑EventLoop的线程,那么所提交 的代码块将会被(直接)执行。否则,EventLoop将调度该任务以便稍后执行,并将它放入 到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。
二、Channel接口
- Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作(bind、connect、read和 write),它大大地降低了直接使用 Socket 类的复杂性
- 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。
Channel 的生命周期状态
- ChannelUnregistered :Channel 已经被创建,但还未注册到 EventLoop
- ChannelRegistered :Channel 已经被注册到了 EventLoop
- ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接 收和发送数据了
- ChannelInactive :Channel 没有连接到远程节点
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。在我们的编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。
重要 Channel 的方法
- eventLoop: 返回分配给 Channel 的 EventLoop
- pipeline: 返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的 ChannelPipeline。
- isActive: 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被 打开便是活动的。
- localAddress: 返回本地的 SokcetAddress
- remoteAddress: 返回远程的 SocketAddress
- write: 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正 写往 socket。
- flush: 将之前已写的数据冲刷到底层 socket 进行传输。
- writeAndFlush: 一个简便的方法,等同于调用 write()并接着调用 flush()
三、ChannelHandlerContext、ChannelPipeline和ChannelHandler
ChannelPipeline接口
当Channel被创建时,它将会被自动地分配一个新的ChannelPipeline,每个Channel都有自己的ChannelPipeline
ChannelPipeline提供了ChannelHandler链的容器,并定义了用于在该链上传播入站(也就是从网络到业务处理)和出站(也就是从业务处理到网络),ChannelHandler都是放在ChannelPipeline中的
ChannelHandler 的生命周期
在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用下面这些方法。这些方法中的每一个都接受一个ChannelHandlerContext参数。
- handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
- handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
- exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用
ChannelPipeline中的ChannelHandler
入站(ChannelInboundHandler)和出站(ChannelOutboundHandler)ChannelHandler被安装到同一个 ChannelPipeline中,ChannelPipeline以双向链表的形式进行维护管理
如果此时有入站事件被触发,就会从ChannelPipeline头部开始流动,到达ChannelPipeline的尾部,中间只会经过定义入站的ChannelHandler;反之,出战就会从ChannelPipeline的尾部到头部,中间中间只会经过定义出站的ChannelHandler
Netty能区分入站事件的 Handler和出站事件的Handler,并确保数据只会在具有相同定向类型的两个ChannelHandler之间传递。
ChannelPipeline上的方法
既然 ChannelPipeline 以双向链表的形式进行维护管理 Handler,自然也提供了对应的方 法在 ChannelPipeline 中增加或者删除、替换 Handler。
- addFirst、addBefore、addAfter、addLast 将一个 ChannelHandler添加到ChannelPipeline中
- remove 将一个ChannelHandler从ChannelPipeline中移除
- replace将ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler
- get 通过类型或者名称返回ChannelHandler
- context返回和ChannelHandler绑定的ChannelHandlerContext
- names返回ChannelPipeline中所有ChannelHandler的名称
- ChannelPipeline的API公开了用于调用入站和出站操作的附加方法。
ChannelHandlerContext
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext
ChannelHandlerContext 的主要作用就和LinkedList 内部的类Node类似。
Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播
channel.write()、channelpipeline.write()会经过所有出站Handler
channelHandlerContext.write()只会经过下一个出站的Handler
ChannelHandlerContext 的 API
alloc 返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator
bind 绑定到给定的 SocketAddress,并返回 ChannelFuture
channel 返回绑定到这个实例的 Channel
close 关闭 Channel,并返回 ChannelFuture
connect 连接给定的 SocketAddress,并返回 ChannelFuture
deregister 从之前分配的 EventExecutor 注销,并返回 ChannelFuture
disconnect 从远程节点断开,并返回 ChannelFuture executor 返回调度事件的 EventExecutor
fireChannelActive 触发对下一个 ChannelInboundHandler 上的
channelActive()方法(已 连接)的调用
fireChannelInactive 触发对下一个 ChannelInboundHandler 上的channelInactive()方法 (已关闭)的调用
fireChannelRead 触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接 收的消息)的调用
fireChannelReadComplete 触发对下一个 ChannelInboundHandler 上的 channelReadComplete()方法的调用
fireChannelRegistered 触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered()方法的调用
fireChannelUnregistered 触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered()方法的调用
fireChannelWritabilityChanged 触发对下一个 ChannelInboundHandler 上的 fireChannelWritabilityChanged()方法的调用
fireExceptionCaught 触发对下一个 ChannelInboundHandler 上的 fireExceptionCaught(Throwable)方法的调用
fireUserEventTriggered 触发对下一个 ChannelInboundHandler 上的 fireUserEventTriggered(Object evt)方法的调用
handler 返回绑定到这个实例的 ChannelHandler
isRemoved 如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true
name 返回这个实例的唯一名称
pipeline 返回这个实例所关联的 ChannelPipeline
read 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的 channelReadComplete(ctx)方法
write 通过这个实例写入消息并经过 ChannelPipeline
writeAndFlush 通过这个实例写入并冲刷消息并经过 ChannelPipeline
当使用 ChannelHandlerContext 的 API 的时候,有以下两点:
- ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的, 所以缓存对它的引用是安全的;
- 相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该 尽可能地利用这个特性来获得最大的性能
入站的handler时经过会读到一个buffer中,如果中间不释放会造成内存泄漏;正常情况netty会处理,但是如果handler执行异常这个buffer就永远不会被netty处理,可以实现SimpleChannelInboundHandler接口重写channelRead0()方法,在此方法处理具体业务逻辑