rust学习~tokio的io

server/2025/3/1 6:25:20/

await

Suspend execution until the result of a Future is ready.
暂停执行,直到一个 Future 的结果就绪。

.awaiting a future will suspend the current function’s execution until the executor has run the future to completion.
对一个 Future 使用 .await 操作会暂停当前函数的执行,直到执行器(executor)将该 Future 运行至完成

Read the async book for details on how async/await and executors work.
有关异步 / 等待(async/await)和执行器的工作原理的详细信息,请阅读《异步编程指南》(Async Book)。

Editions
await is a keyword from the 2018 edition onwards.
await 是从 2018 版及后续版本开始引入的关键字。

It is available for use in stable Rust from version 1.39 onwards.
从 1.39 版本及以后的稳定版 Rust 中可以使用它。

AsyncReadExt

rust">use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut f = File::open("foo.txt").await?;let mut buffer = Vec::new();// 读取整个文件的内容f.read_to_end(&mut buffer).await?;// String::from_utf8 是 String 类型的一个关联函数// 专门用于把 Vec<u8> 类型的字节向量转换为 String 类型的 UTF - 8 字符串// 它会检查字节向量中的字节序列是否符合 UTF - 8 编码规则// 如果符合则返回一个 Ok(String),若不符合则返回 Err(FromUtf8Error)// 适用于从字节数据(如文件读取、网络接收等)构建字符串,并且需要确保数据是有效的 UTF - 8 编码match String::from_utf8(buffer) {Ok(content) => {println!("文件内容如下:\n{}", content);}Err(e) => {eprintln!("将文件内容转换为字符串时出错: {}", e);}}Ok(())
}
rust">use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut f = File::open("foo.txt").await?;let mut buffer = Vec::new();// 读取整个文件的内容f.read_to_end(&mut buffer).await?;Ok(())
}

为什么说 字节数组 &[u8] 实现了 AsyncRead ?

字节数组切片 &[u8] 实现了 AsyncRead 特征,这意味着它可以作为异步读取操作的数据源,允许以异步的方式从字节数组中读取数据

AsyncRead 特征的作用

AsyncRead 是 tokio 异步运行时库中定义的一个特征,它定义了异步读取操作的接口。其核心方法是 poll_read,该方法用于尝试从数据源中异步读取数据到指定的缓冲区。通过实现 AsyncRead 特征,类型可以参与到异步 I/O 操作中,利用异步运行时的调度机制,在等待数据可读时让出控制权,提高程序的并发性能。

AsyncRead 特征的简化定义如下:

rust">use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::Result;pub trait AsyncRead {fn poll_read(self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut [u8],) -> Poll<Result<usize>>;
}

self 是实现该特征的类型的可变引用,使用 Pin 确保在异步操作过程中对象的内存位置不会改变
cx 是任务上下文,包含了任务的唤醒器等信息,用于在数据准备好时唤醒任务
buf 用于存储读取数据的缓冲区
Poll 枚举表示操作的结果,可能是 Poll::Ready 表示操作已完成,返回实际读取的字节数;也可能是 Poll::Pending 表示操作还未完成,需要等待。

&[u8] 实现 AsyncRead 的原因

灵活性和通用性:字节数组切片 &[u8] 是一种非常常见且灵活的数据表示方式,它可以表示内存中的一段连续字节数据。实现 AsyncRead 特征后,&[u8] 可以作为异步读取操作的数据源,使得很多使用 AsyncRead 的代码可以直接处理字节数组,无需额外的转换。
例如,在测试代码中,可以使用字节数组模拟文件或网络数据进行异步读取测试。
异步编程的一致性:在异步编程中,希望不同的数据源(如文件、网络套接字、内存缓冲区等)都能以统一的方式进行异步读取操作。通过让 &[u8] 实现 AsyncRead 特征,保持了异步读取操作的一致性,使得代码更加简洁和易于维护。

示例代码

rust">use tokio::io::{self, AsyncReadExt};#[tokio::main]
async fn main() -> io::Result<()> {// 定义一个字节数组,存储了字符串 "Hello, World!" 的 UTF - 8 编码字节数据let data = b"Hello, World!";let mut buffer = [0; 5];// 创建一个字节数组切片 &[u8],作为异步读取的数据源let mut reader = &data[..];// 调用 read 方法(该方法是基于 AsyncRead 特征实现的),异步地从字节数组切片中读取数据到 buffer 中// await 关键字用于等待读取操作完成,最终返回实际读取的字节数let n = reader.read(&mut buffer).await?;println!("Read {} bytes: {:?}", n, &buffer[..n]);Ok(())
}

实际应用场景

测试:在编写异步 I/O 代码的单元测试时,可以使用字节数组模拟不同的数据源,方便进行测试。例如,测试一个异步数据解析器时,可以使用字节数组提供测试数据。
内存数据处理:当需要对内存中的字节数据进行异步处理时,如对加密数据、压缩数据等进行异步解密或解压缩操作,可以直接使用字节数组作为数据源。

字节数组切片 &[u8] 实现 AsyncRead 特征,为异步编程提供了更多的灵活性和一致性,使得字节数组可以方便地作为异步读取操作的数据源。

AsyncWriteExt

rust">use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut file = File::create("foo.txt").await?;// 将一个 &str 字符串转变成一个字节数组:&[u8;10]// 然后 write 方法又会将这个 &[u8;10] 的数组类型隐式强转为数组切片: &[u8]let n = file.write(b"some bytes").await?;println!("Wrote the first {} bytes of 'some bytes'.", n);Ok(())
}

AsyncWriteExt::write_all 将缓冲区的内容全部写入到写入器中

rust">use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut file = File::create("foo.txt").await?;file.write_all(b"some bytes").await?;Ok(())
}

tokio::io处理标准的输入/输出/错误

rust">use tokio::fs::File;
use tokio::io;#[tokio::main]
async fn main() -> io::Result<()> {// &[u8] 是字节数组切片let mut reader: &[u8] = b"hello";let mut file = File::create("foo.txt").await?;// 异步的将读取器( reader )中的内容拷贝到写入器( writer )中// 字节数组 &[u8] 实现了 AsyncRead,所以这里可直接使用 readerio::copy(&mut reader, &mut file).await?;Ok(())
}

tokio::io分离读写器

错误写法

rust">io::copy(&mut socket, &mut socket).await

读取器和写入器都是同一个 socket,因此需要对其进行两次可变借用,这明显违背了 Rust 的借用规则

用同一个 socket 是不行的,为了实现目标功能,必须将 socket 分离成一个读取器和写入器
任何一个读写器( reader + writer )都可以使用 io::split 方法进行分离,最终返回一个读取器和写入器,两者可以独自使用,例如可以放入不同的任务中。

回声服务端

rust">use tokio::io;
use tokio::net::TcpListener;#[tokio::main]
async fn main() -> io::Result<()> {let listener = TcpListener::bind("127.0.0.1:6142").await?;loop {let (mut socket, _) = listener.accept().await?;tokio::spawn(async move {// split 操作和 io::copy 调用是在同一个异步任务上下文中执行的// 由于它们处于同一个任务中,所以不存在不同任务之间的数据传递开销和同步问题// 任务的执行是连贯的,避免了因为跨任务操作而引入的额外复杂性和性能损耗let (mut rd, mut wr) = socket.split();if io::copy(&mut rd, &mut wr).await.is_err() {eprintln!("failed to copy");}});}
}

回声客户端

rust">use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;#[tokio::main]
async fn main() -> io::Result<()> {let socket = TcpStream::connect("127.0.0.1:6142").await?;let (mut rd, mut wr) = io::split(socket);// 创建异步任务,在后台写入数据tokio::spawn(async move {wr.write_all(b"hello\r\n").await?;wr.write_all(b"world\r\n").await?;// 有时,我们需要给予 Rust 一些类型暗示,它才能正确的推导出类型Ok::<_, io::Error>(())});let mut buf = vec![0; 128];loop {let n = rd.read(&mut buf).await?;if n == 0 {break;}println!("GOT {:?}", &buf[..n]);}Ok(())
}

tokio::spawn 是 Tokio 运行时提供的一个函数,用于创建一个新的异步任务并将其放入任务队列中等待执行。这个新任务会在 Tokio 运行时的调度下异步执行,与当前代码所在的任务是并发关系,而不是顺序执行关系。
当执行到 tokio::spawn 时,它会立即将传入的异步闭包包装成一个新的异步任务并放入 Tokio 运行时的任务队列中,然后代码会继续往下执行,不会等待这个新任务开始执行。
因此,let mut buf = vec![0; 128]; 这行代码会紧接着 tokio::spawn 之后执行,而 tokio::spawn 内部的异步任务会在 Tokio 运行时调度到它时才开始执行。
Tokio 运行时的调度是基于事件驱动和任务优先级的,它会根据系统资源和任务的状态动态地决定哪个任务先执行。所以,tokio::spawn 内部的任务可能在 let mut buf = vec![0; 128]; 之前执行,也可能在之后执行,甚至可能与后续代码并发执行
假设 tokio::spawn 内部的任务执行需要一些时间(例如网络延迟),而创建 buf 向量的操作很快,那么很可能 let mut buf = vec![0; 128]; 会先执行,然后才轮到 tokio::spawn 内部的任务开始执行

C-S修正

rust">cargo run --bin server.rs
cargo run --bin client.rs

上述代码跑起来之后,服务端不退出的话,客户端会一直卡住,客户端加如下函数即可解决

rust">wr.shutdown().await ? ;

手动实现io

rust">use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;#[tokio::main]
async fn main() -> io::Result<()> {let listener = TcpListener::bind("127.0.0.1:6142").await?;loop {let (mut socket, _) = listener.accept().await?;tokio::spawn(async move {// 此处的缓冲区是一个 Vec 动态数组,它的数据是存储在堆上,而不是栈上// 若改成 let mut buf = [0; 1024];,则存储在栈上// 一个数据如果想在 .await 调用过程中存在,那它必须存储在当前任务内// buf 会在 .await 调用过程中被使用,因此它必须要存储在任务内let mut buf = vec![0; 1024];loop {match socket.read(&mut buf).await {// 返回值 `Ok(0)` 说明对端已经关闭Ok(0) => return,Ok(n) => {// Copy the data back to socket// 将数据拷贝回 socket 中if socket.write_all(&buf[..n]).await.is_err() {// 非预期错误,由于我们这里无需再做什么,因此直接停止处理return;}}Err(_) => {// 非预期错误,由于我们无需再做什么,因此直接停止处理return;}}}});}
}

若该缓冲区数组创建在栈上,那每条连接所对应的任务的内部数据结构看上去可能如下所示

rust">struct Task {task: enum {AwaitingRead {socket: TcpStream,buf: [BufferType],},AwaitingWriteAll {socket: TcpStream,buf: [BufferType],}}
}

栈数组要被使用,就必须存储在相应的结构体内,其中两个结构体分别持有了不同的栈数组 [BufferType]
这种方式会导致任务结构变得很大
一般选择缓冲区长度往往会使用分页长度(page size),因此使用栈数组会导致任务的内存大小变得很奇怪甚至糟糕:
$page-size + 一些额外的字节

编译器会进一步优化 async 语句块的布局,而不是像上面一样简单的使用 enum
在实践中,变量也不会在枚举成员间移动。但是再怎么优化,任务的结构体至少也会跟其中的栈数组一样大
因此通常情况下,使用堆上的缓冲区会高效实用的多

当任务因为调度在线程间移动时,存储在栈上的数据需要进行保存和恢复,过大的栈上变量会带来不小的数据拷贝开销
因此,存储大量数据的变量最好放到堆上

处理 EOF

当 TCP 连接的读取端关闭后,再调用 read 方法会返回 Ok(0)。此时,再继续下去已经没有意义,因此需要退出循环。
忘记在 EOF 时退出读取循环,是网络编程中一个常见的 bug :

rust">loop {match socket.read(&mut buf).await {Ok(0) => return,// ... 其余错误处理}
}

一旦读取端关闭后,那后面的 read 调用就会立即返回 Ok(0),而不会阻塞等待,因此这种无阻塞循环会最终导致 CPU 立刻跑到 100%,并将一直持续下去,直到程序关闭。

小节

实际上,io::split 可以用于任何同时实现了 AsyncRead 和 AsyncWrite 的值,它的内部使用了 Arc 和 Mutex 来实现相应的功能。如果大家觉得这种实现有些重,可以使用 Tokio 提供的 TcpStream,它提供了两种方式进行分离:

TcpStream::split 会获取字节流的引用,然后将其分离成一个读取器和写入器。但由于使用了引用的方式,它们俩必须和 split 在同一个任务中。 优点就是,这种实现没有性能开销,因为无需 Arc 和 Mutex。
TcpStream::into_split 还提供了一种分离实现,分离出来的结果可以在任务间移动,内部是通过 Arc 实现


http://www.ppmy.cn/server/171481.html

相关文章

【大模型实战篇】DeepSeek-R1面临的DDOS攻击以及与O系列的差距

1. 问题 虽然不可否认DeepSeek-R1作为一款大模型表现出色&#xff0c;其开源也推动了技术的进步。然而&#xff0c;在这份热闹的背后&#xff0c;我们仍需保持冷静。该推理模型在某些特殊场景下存在问题&#xff1a;其think过程可能会持续不断地输出&#xff0c;直到达到token…

Ubuntu 下 nginx-1.24.0 源码分析 - ngx_list_t

ngx_list_t 定义在 src/core/ngx_list.h typedef struct {ngx_list_part_t *last;ngx_list_part_t part;size_t size;ngx_uint_t nalloc;ngx_pool_t *pool; } ngx_list_t;ngx_list_t 是 Nginx 中用于管理动态数组链表的核心数据结构&#xff0c;结…

kotlin的lateinit声明 的变量可能为null吗?

kotlin的lateinit声明 的变量可能为null吗&#xff1f; DeepSeek R1 思考 26 秒 思考过程 嗯&#xff0c;用户问的是Kotlin中的lateinit变量是否可能为null。首先&#xff0c;我需要回顾一下lateinit的作用和特性。lateinit用于延迟初始化一个非空类型的变量&#xff0c;这样…

6.6.3 SQL数据查询(一)

文章目录 SELECT语句简单查询连接查询子查询聚集函数 SELECT语句 SELECT是用于查询的动词&#xff0c;基本格式为SELECT-FROM-WHERE&#xff0c;其中WHERE可选。SELECT对应投影运算&#xff0c;FROM对应笛卡尔积运算&#xff0c;WHERE对应选择运算。选择使用的条件表达式p可包含…

Python入门 — 类

面向对象编程中&#xff0c;编写表示现实世界中的事物和情景的类&#xff08;class&#xff09;&#xff0c;并基于这些类来创建对象&#xff08;object&#xff09;。根据类来创建对象称为实例化&#xff0c;这样就可以使用类的实例&#xff08;instance&#xff09; 一、创建…

开发基于提示工程的大语言模型(LLM)应用——学习笔记

本文是学习笔记。学习通过提示工程与大语言模型进行程序化的交互。 将从最基本的开始&#xff0c;比如使用哪些模型&#xff0c;以及如何向它们发送提示词并查看响应。 将逐步构建更复杂的提示词&#xff0c;并学习 LangChain 为我们提供的、用于与大语言模型交互的丰富工具。…

使用优化版的编辑距离算法替代ES默认的评分算法

优化版编辑距离算法 public static int minDistance(String str, String str1) {int len str.length(), len1 str1.length();// 预处理char[] arr1 str.toCharArray();char[] arr2 str1.toCharArray();// 动态规划数组int[] dp new int[len1 1];for (int j 0; j < le…

Rust ~ Vec<u8>和[u8]

Vec<u8> 和 &[u8] 是两种不同的数据类型&#xff0c;它们都与字节序列相关&#xff0c;但在所有权、内存管理、使用场景等方面存在明显区别 类型本质 Vec<u8>&#xff1a;Rust 中的动态数组类型&#xff0c;即向量&#xff08;vector&#xff09;。它是一个拥…