Netty笔记3:NIO编程

ops/2025/3/6 1:05:29/

Netty笔记1:线程模型

Netty笔记2:零拷贝

Netty笔记3:NIO编程

Netty笔记4:Epoll

Netty笔记5:Netty开发实例

Netty笔记6:Netty组件

Netty笔记7:ChannelPromise通知处理

Netty笔记8:ByteBuf使用介绍

Netty笔记9:粘包半包

Netty笔记10:LengthFieldBasedFrameDecoder很简单

Netty笔记11:编解码器

Netty笔记12:模拟Web服务器

Netty笔记13:序列化

文章目录

  • 前言
  • 编程示例
  • 总结

前言

想要快速理解NIO编程,需要先理解上篇的零拷贝技术和线程模型,本篇是对这两个知识的实践,也是netty的过度。

编程示例

我们尝试写一个NIO程序:

需要注意的是:

  1. 进行网络传输时,涉及到的数据,必须要经过缓冲区,不管是发送还是接收,结合用户态和内核态的切换过程就可以明白;
  2. NIO中的缓冲可以使用堆内存缓存和直接内存缓冲,这个需要结合零拷贝技术可以理解;
  3. 多路复用使用selector模式,需要循环遍历socket

注:buf在堆上。在进行数据发送时,如果使用堆内存,在JVM之外创建一个DirectBuf,然后把堆上的数据拷贝的这个DirectBuf,再写到SendBuf中,因为JVM中存在GC机制,如果使用引用方式,在拷贝过程中出现GC,会重新分配地址,导致数据出现问题。

服务端:

java">public class ServerHandle implements Runnable{private Selector selector;private ServerSocketChannel serverSocketChannel;public ServerHandle(int port) {try {selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();// channel必须处于非阻塞模式下,不然会报错,所以不能同FileChannel一起使用serverSocketChannel.configureBlocking(false);serverSocketChannel.socket().bind(new InetSocketAddress(port));// 注册对应的事件,这里注册的是accept事件,只要监听到就会调用对应的处理器// 注册还可以添加附加对象,也就是第三个参数,获取方式:key.attachment();// 如果注册了事件,需要取消,需要调用channel.cancel()serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务端已准备好:" + port);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {while (true) {try {// 阻塞直到通道就绪,这边设置了超时时间// 返回值:有多少通道就绪selector.select(1000);// 在通道就绪时,获取对应的键,也就是事件Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// 根据事件的类型进行对应的处理handlerInput(key);}} catch (Exception e) {throw new RuntimeException(e);}}}private void handlerInput(SelectionKey key) throws IOException {// 在这个循环中,可能存在key.cancel() 或者移除,所以这里需要判断是否有效if (!key.isValid()) {return;}try {if (key.isAcceptable()) {// 这里处理客户端连接服务端的事件ServerSocketChannel channel = (ServerSocketChannel)key.channel();try {// 接收请求SocketChannel sc = channel.accept();System.out.println("-----建立连接------");// 设置该通道非阻塞sc.configureBlocking(false);// 并注册read事件,监听着sc.register(selector, SelectionKey.OP_READ);} catch (IOException e) {System.out.println("连接客户端失败!");key.cancel();channel.close();}}if (key.isReadable()) {SocketChannel sc = (SocketChannel)key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int read = sc.read(buffer);if (read > 0) {// 反转:将这个缓冲中的数据从现在的位置变成从0开始buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String msg = new String(bytes, StandardCharsets.UTF_8);System.out.println("服务器收到消息:" + msg);doWrite(sc, "hello,收到消息:" + msg);}}} catch (IOException e) {System.out.println("数据处理失败!");// 再处理失败,或异常时要退出通道,不然每次循环都会检查通道导致一致报错key.channel().close();key.cancel();}}private void doWrite(SocketChannel sc, String msg) throws IOException {byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);ByteBuffer buffer = ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();sc.write(buffer);}
}

服务端启动:

java">    public static void main(String[] args) {ServerHandle serverHandle = new ServerHandle(8080);new Thread(serverHandle).start();}

客户端:

java">public class ClientHandle implements Runnable{private String ip;private int port;private SocketChannel socketChannel;private Selector selector;public ClientHandle(String ip, int port) {try {this.ip = ip;this.port = port;selector = Selector.open();socketChannel = SocketChannel.open();// 非阻塞状态socketChannel.configureBlocking(false);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void run() {try {// 启动后,执行连接操作if (socketChannel.connect(new InetSocketAddress(ip, port))) {// 连接服务端成功后,注册读取事件socketChannel.register(selector, SelectionKey.OP_READ);} else {// 如果连接失败,则再注册连接事件,之后再进行处理socketChannel.register(selector, SelectionKey.OP_CONNECT);}} catch (IOException e) {throw new RuntimeException(e);}while (true) {try {// 阻塞1000秒直到有通道就绪selector.select(1000);Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();handleInput(key);}} catch (IOException e) {throw new RuntimeException(e);}}}public void sendMsg(String msg) throws IOException {byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);ByteBuffer buffer = ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();socketChannel.write(buffer);}private void handleInput(SelectionKey key) throws IOException {if (!key.isValid()) {return;}SocketChannel sc = (SocketChannel) key.channel();if (key.isConnectable()) {if (sc.finishConnect()) {socketChannel.register(selector, SelectionKey.OP_READ);} else {System.exit(1);}}if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024);int read = sc.read(buffer);if (read > 0) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String msg = new String(bytes, StandardCharsets.UTF_8);System.out.println("客户端收到消息:" + msg);} else if (read < 0) {key.cancel();;sc.close();}}}
}

客户端启动:

java"> public static void main(String[] args) throws IOException {ClientHandle handle = new ClientHandle("localhost", 8080);new Thread(handle).start();Scanner scanner = new Scanner(System.in);// 死循环保持监听while (true) {// 每次控制台输入,就发送给服务端handle.sendMsg(scanner.nextLine());}}

image-20250110175427693image-20250110175437500

总结

该示例对应于reactor单线程模型,服务端是一个单线程,通过selector单线程循环接收客户端的请求,并识别客户端请求事件类型,进行分发处理,相对于BIO很明显的区别,就是它不会等到上一个请求处理完成。在消息接收与发送的过程中,我们需要对缓冲数据进行处理,也就是对应于零拷贝知识点中提到的缓冲区概念,实例中用到了ByteBuffer对象,它是NIO中一个比较重要的对象,下一篇会进行说明。


http://www.ppmy.cn/ops/163454.html

相关文章

深入探索Python机器学习算法:监督学习(线性回归,逻辑回归,决策树与随机森林,支持向量机,K近邻算法)

文章目录 深入探索Python机器学习算法&#xff1a;监督学习一、线性回归二、逻辑回归三、决策树与随机森林四、支持向量机五、K近邻算法 深入探索Python机器学习算法&#xff1a;监督学习 在机器学习领域&#xff0c;Python凭借其丰富的库和简洁的语法成为了众多数据科学家和机…

【Go】Go viper 配置模块

1. 配置相关概念 在项目开发过程中&#xff0c;一旦涉及到与第三方中间件打交道就不可避免的需要填写一些配置信息&#xff0c;例如 MySQL 的连接信息、Redis 的连接信息。如果这些配置都采用硬编码的方式无疑是一种不优雅的做法&#xff0c;有以下缺陷&#xff1a; 不同环境…

Python----Python爬虫(多线程,多进程,协程爬虫)

注意&#xff1a; 该代码爬取小说不久或许会失效&#xff0c;有时候该网站会被封禁&#xff0c;代码只供参考&#xff0c;不同小说不同网址会有差异 神印王座II皓月当空最新章节_神印王座II皓月当空全文免费阅读-笔趣阁 一、多线程爬虫 1.1、单线程爬虫的问题 爬虫通常被认为…

DeepSeek赋能Power BI:开启智能化数据分析新时代

在数据驱动决策的时代&#xff0c;数据分析工具的高效性与智能化程度成为决定企业竞争力的关键因素。Power BI作为一款功能强大的商业智能工具&#xff0c;深受广大数据分析师和企业用户的喜爱。而DeepSeek这一先进的人工智能技术的加入&#xff0c;更是为Power BI注入了新的活…

通过 Groq 后端加载Llama 模型,并调用Function call,也就是通过Groq 后端进行工具的绑定和调用

完整代码&#xff1a; import getpass import os from langchain.chat_models import init_chat_model from langchain_core.tools import tool from langchain_core.messages import HumanMessage, ToolMessage,SystemMessage# 如果没有设置 GROQ_API_KEY&#xff0c;则提示用…

如何停止Oracle expdp/impdp job

一、停止 expdp job举例 1.执行 expdp 命令 $ expdp rui/rui DIRECTORYdmp_dir dumpfilestudyfull_expdp.dmp FULLy logfilestudyfullexpdp.log job_nameexpdp_job2.查看在运行的作业名称 SQL> select job_name,state from dba_datapump_jobs; JOB_NAME …

0x03 http协议和分层架构

HTTP协议 简介 Hyper Text Transfer Protocol&#xff0c;超文本传输协议&#xff0c;规定了浏览器和服务器之间数据传输的规则 http协议基于TCP协议&#xff1a;面向连接&#xff0c;安全基于请求-响应模型&#xff1a;一次请求对应一次响应HTTP协议是无状态的协议&#xff…

颠覆NLP的魔法:深度解读Transformer架构及其核心组件

目录 颠覆NLP的魔法&#xff1a;深度解读Transformer架构及其核心组件 一、Transformer 架构概述 二、核心组件解析 1. Self-Attention&#xff08;自注意力机制&#xff09; 2. 位置编码&#xff08;Positional Encoding&#xff09; 3. 多头注意力&#xff08;Multi-Hea…