NIO中的异步—ChannelFuture、CloseFuture以及异步提升在NIO中的应用

news/2024/9/24 21:21:31/

ChannelFuture

        客户端调用connect后返回值为ChannelFuture对象,我们可以利用ChannelFuture中的channel()方法获取到Channel对象。

        由于上述代为为客户端实现,若想启动客户端实现连接操作,必须编写服务端代码,实现如下:

java">package netty.simpleNetty.channel;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;/*** 服务端配合ClientChannelFuture验证Channel关闭的异步以及回调函数实现*/
@Slf4j
public class ServerChannelFuture {public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if(buffer != null){byte[] bytes = new byte[16];ByteBuf len = buffer.readBytes(bytes, 0, buffer.readableBytes());log.info(new String(bytes));}}});}}).bind(8000).sync();}
}

        随后运行上述客户端代码,观察到日志输出如下:

         通过此输出日志发现借用channelFuture对象的channel方法获取的Channel对象并没有创建。

原因分析:connect方法是异步的,意味着不等待连接建立,方法执行就返回了。因此channelFuture对象中不能【立刻】获取到正确的Channel对象。

解决方法:

  • 使用sync方法让异步操作同步等待连接建立。
  • 使用回调方法,当connect连接建立后主动调用回调函数

sync让异步操作同步

        获取到channelFuture对象后,调用sync()方法,同步等待连接的结束。sync阻塞住当前线程,直到Nio线程连接建立完毕 。先启动服务端再启动客户端日志输出如下:

         此时调用channelFuture.channle()方法获取到Channel,随后可以利用获取到的channel向服务端发送消息。

使用回调函数 

        调用获取的channelFuture对象的addListener()方法向其添加回调函数

        可以将上述代码用Lambda表达式简化如下:

         服务端整体代码实现如下:

java">package netty.simpleNetty.channel;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;/*** 客户端关闭channel异步以及回调函数实现*/
@Slf4j
public class ClientChannelFuture {public static void main(String[] args) throws UnknownHostException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup(2)).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}}).connect(new InetSocketAddress(InetAddress.getLocalHost(), 8000));log.info("Before sync : {}",channelFuture.channel());
//        channelFuture.sync();
//        log.info("After sync : {}", channelFuture.channel());channelFuture.addListener((ChannelFutureListener)future -> {log.info("After listen : {}", future.channel());});
//        channelFuture.addListener(new ChannelFutureListener() {
//            @Override
//            public void operationComplete(ChannelFuture future) throws Exception {
//                log.info("After listen : {}", future.channel());
//            }
//        });}
}

CloseFuture

        上述操作探讨了调用connect方法建立连接后,根据chanelFuture获取到Channel对象的sync以及回调方法实现。在获取到Channel对象后,执行对应的writeAndFlush()方法发送消息后,需要及时关闭channel

        channel关闭操作的实现有sync同步等待以及添加回调的实现方法:代码如下:

java">package netty.simpleNetty.channel;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Scanner;/*** Test CloseFuture的异步以及回调实现*/
@Slf4j
public class ClientCloseFuture {public static void main(String[] args) throws InterruptedException, UnknownHostException {Channel channel = new Bootstrap().group(new NioEventLoopGroup(2)).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));ch.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress(InetAddress.getLocalHost(), 8000)).sync().channel();log.info("channel : {}", channel);new Thread(()->{Scanner scanner = new Scanner(System.in);while (true){String line = scanner.nextLine();if("q".equals(line)){// channel的close也是一个异步操作,本线程不会执行close操作,而是交由一个其他线程执行close操作channel.close();break;}channel.writeAndFlush(line);}}).start();ChannelFuture closeFuture = channel.closeFuture();log.info("waiting close...");closeFuture.sync();log.info("closed...");channel.closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info("closed...");}});//        channel.closeFuture().addListener((ChannelFutureListener) channelFuture1 ->{
//                    log.info("closed...");
//        });}
}

异步提升

        针对上述connect以及close操作,真正执行对应方法是在一个新的线程中执行的,而不是在调用connect的线程中执行连接操作。

        为什么不在一个线程中去执行建立连接、去执行关闭 channel【建立连接connet,关闭channel都是在EventLoopGroup中执行的。也即nio中执行的】,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。

        因为 netty 异步方式用了多线程、多线程就效率高。(理解错误的)

         netty中的异步指的是建立连接connet、读取数据read、关闭channel等不再一个线程中。而是由专门的线程(handler、EventLoop)负责执行专门的操作。

        思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96。

经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下

因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍。

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势

  • 异步并没有缩短响应时间,反而有所增加。【这里所说的响应时间的增加主要指的是不同子任务之间等待的耗时。

  • 合理进行任务拆分,也是利用异步的关键

 


http://www.ppmy.cn/news/1513948.html

相关文章

uniapp 小程序 设置按钮固定到页面的最下方

解决方案 我们在做小程序的时候&#xff0c;特别是页面是以列表的形式进行展示&#xff0c;并且页面必须还要新增数据时&#xff0c;这是就会在页面的底部加一个固定的新增按钮&#xff0c;点击新增按钮&#xff0c;弹出一个弹窗…然后进行下一步的业务逻辑操作&#xff0c;那…

EmguCV学习笔记 VB.Net 5.1 基本变换

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 EmguCV是一个基于OpenCV的开源免费的跨平台计算机视觉库,它向C#和VB.NET开发者提供了OpenCV库的大部分功能。 教程VB.net版本请访问…

C语言每日好题(3)

有任何不懂的问题可以评论区留言&#xff0c;能力范围内都会一一回答 #define _CRT_SECURE_NO_WARNING #include <stdio.h> #include <string.h> int main(void) {if ((strlen("abc") - strlen("abcdef")) > 0)printf(">\n")…

Mac系统安装Homebrew【已成功】

1、正常安装失败原因 1.1命令行安装失败 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" 原因 没挂&#x1fa9c;&#xff0c;不过我挂了梯子安装很多次也还是失败&#xff0c;所以可能是网站原因 1.2、网…

电梯测试案例分析 —— 软件测试中的经典挑战

前言 在软件测试领域,有一个广为人知的经典案例——电梯问题。它不仅考验着测试人员的技术能力,还对他们的逻辑思维和问题解决技巧提出了挑战。本文将通过一个具体的电梯软件测试案例,来探讨测试中需要注意的关键点。 一、电梯问题背景介绍 电梯系统是一个典型的多用户交互…

Web大学生网页作业成品——明星EXO介绍网页设计与实现(HTML+CSS)(10个页面)(TABLE布局)

&#x1f389;&#x1f389;&#x1f389; 常见网页设计作业题材有**汽车、环保、明星、文化、国家、抗疫、景点、人物、体育、植物、公益、图书、节日、游戏、商城、旅游、家乡、学校、电影、动漫、非遗、动物、个人、企业、美食、婚纱、其他**等网页设计题目, 可满足大学生网…

Kafka主题(Topic/文件夹)的操作

Kafka主题&#xff08;Topic/文件夹&#xff09;的操作 1、Kafka主题&#xff08;Topic/文件夹&#xff09;2、Kafka主题&#xff08;Topic/文件夹&#xff09;的一些操作2.1、创建主题&#xff08;Topic/文件夹&#xff09;2.2、列出所有主题&#xff08;Topic/文件夹&#xf…

iOS 18 Beta 5:苹果的细腻之笔,绘制用户体验新画卷

在苹果的世界里&#xff0c;每一次系统更新都是对用户体验进行的一次精心雕琢。 随着iOS 18 Beta 5的上线&#xff0c;苹果带来了一系列令人耳目一新的功能&#xff0c;同时也在系统的每个细微之处展现了对完美的追求。 Safari浏览器的“干扰控制”功能 在今天信息充斥的数字…