【Rust自学】16.2. 使用消息传递来跨线程传递数据

server/2025/2/3 12:01:12/

喜欢的话别忘了点赞、收藏加关注哦,对接下来的教程有兴趣的可以关注专栏。谢谢喵!(=・ω・=)
请添加图片描述

16.2.1. 消息传递

有一种很流行而且能保证安全并发的技术(或者叫机制)叫做消息传递。在这种机制里,线程(或Actor)通过彼此间发送消息(数据)来进行通讯。

Go语言有一句名言是这么说的:Do not communicate by sharing memory; instead, share memory by communicating.(不要用共享内存来通信,要用通信来共享内存)

Go语言的并发机制体现了这种思想。Rust也提供了机遇消息传递的一种并发方式,具体的就是在Rust里实现就是使用Channel(标准库提供)。Go语言里也有Channel,思路差不多。

16.2.2. 理解Channel

可以将编程中的Channle想象为定向水道,例如小溪或河流。如果你把橡皮鸭之类的东西放入河中,它会顺流而下,到达水道的尽头。

通道有两部分:发送端接收端。发射端是将橡皮鸭放入河中的上游位置,接收端是橡皮鸭最终到达下游的位置。代码的一部分使用要发送的数据调用发送端上的方法,另一部分检查接收端是否有到达的消息。如果发送端或接收端其一掉线,则称通道已关闭。

具体的步骤:

  • 调用发送端的方法,发送数据
  • 接收端会检查和接收到达的数据
  • 如果发送端、接收端中的任意一端被丢弃了,那么Channel关闭了。

16.2.3. 创建channel

使用mpsc::channel函数来创建Channelmpsc表示multiple producer, single consumer(多个生产者、一个消费者),表示可以有多个发送端,但是只能有一个接收端。

调用这个函数返回一个元组(tuple,详见 3.3. 数据类型:复合类型),有两个元素,分别是发送端和接收端。

看个例子:

rust">use std::sync::mpsc;
use std::thread;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();});let received = rx.recv().unwrap();println!("Got: {received}");
}
  • 首先使用mpsc::channel函数来创建Channel,返回的元组使用模式匹配进行解构,分别用txrx表示发送端和接收端。

  • 接下来创建了一个线程,使用move关键字表示发送端tx的所有权被移至分线程内,线程必须拥有通道发送端的所有权才能往通道里发消息。
    使用send方法来发送消息,返回类型是Result类型,如果接收端被丢弃了那么返回值就是Err,反之就是Ok。在这里面就简单地使用unwrap进行错误处理即可,这样如果接收端被丢弃就会恐慌。

  • 接收端有两个方法来获取消息,这里使用了recv方法(recieve的简写)。它会一直阻塞这个线程,直到有消息被传入为止。
    消息被包裹在Result类型中,有消息就返回Ok,反之就是Err,一样使用unwrap简单地处理错误即可。

输出:

Got: hi

发送端send方法

send方法的参数是想要发送的数据,返回Result类型。如果有问题(例如接收端已经被丢弃)就会返回Err

接收端的方法

  • recv方法:阻止当前线程执行,直到Channel中有值传来,一旦收到值,就返回Result类型,如果发送端关闭了,就会收到Err

  • try_recv方法:不会阻塞当前线程执行,立即返回Result类型,有数据到达就是OK变体包裹着传过来的数据;否则就返回错误。
    通常是使用循环调用来检查try_recv的结果。一旦有消息来了就开始处理,如果没来,那么这时候也可以执行其他指令。

16.2.4. channel和所有权转移

所有权在消息传递中非常重要,它能帮你编写安全、并发的代码。

看个例子:

rust">use std::sync::mpsc;
use std::thread;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();println!("val is {val}");});let received = rx.recv().unwrap();println!("Got: {received}");
}

在刚才的代码上加了println!("val is {val}");这句话。把值传入send函数后想继续在线程里使用值。

输出:

$ cargo runCompiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`--> src/main.rs:10:26|
8  |         let val = String::from("hi");|             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();|                 --- value moved here
10 |         println!("val is {val}");|                          ^^^^^ value borrowed here after move|= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

错误在于借用了已经移动值val。它的所有权已经在传入send时移交出去了,所以就会报错。

下一个例子通过发送多个值来观察接受者等待的过程:

rust">use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let vals = vec![String::from("hi"),String::from("from"),String::from("the"),String::from("thread"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("Got: {received}");}
}
  • 分线程以循环的方式发送Vector里的各个元素,每次发送完之后会暂停1秒
  • 主线程接收端被当了一个迭代器来使用(因为实现了iterator trait),这样就不需要显式调用recv函数了。每收到一个值就将它打印出来。当发送端执行完毕被丢弃时,Channel就关闭了,循环就不会继续。程序退出。

输出:

Got: hi
Got: from
Got: the
Got: thread

16.2.5. 通过克隆创建多个发送者

继续在上一个代码的基础上稍作修改:

rust">use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main(){let (tx, rx) = mpsc::channel();let tx1 = tx.clone();thread::spawn(move || {let vals = vec![String::from("hi"),String::from("from"),String::from("the"),String::from("thread"),];for val in vals {tx1.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});thread::spawn(move || {let vals = vec![String::from("more"),String::from("messages"),String::from("for"),String::from("you"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("Got: {received}");}
}

这里多了一个分线程,现在有2个分线程都想要给主线程发消息,所以就需要两个发送端。针对这种情况,只需要对代表发送端的变量tx使用clone方法即可,也就是原文的let tx1 = tx.clone();这一句。

输出:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

接收端收到的数据是交替出现的。


http://www.ppmy.cn/server/164604.html

相关文章

mysql_init和mysql_real_connect的形象化认识

解析总结 1. mysql_init 的作用 mysql_init 用于初始化一个 MYSQL 结构体,为后续数据库连接和操作做准备。该结构体存储连接配置及状态信息,是 MySQL C API 的核心句柄。 示例: MYSQL *conn mysql_init(NULL); // 初始化连接句柄2. mysql_…

使用openwrt搭建ipsec隧道

背景:最近同事遇到了个ipsec问题,做的ipsec特性,ftp下载ipv6性能只有100kb, 正面定位该问题也蛮久了,项目没有用openwrt, 不过用了开源组件strongswan, 加密算法这些也是内核自带的,想着开源的不太可能有问题&#xff…

IM 即时通讯系统-45-merua0oo0 IM 分布式聊天系统

IM 开源系列 IM 即时通讯系统-41-开源 野火IM 专注于即时通讯实时音视频技术,提供优质可控的IMRTC能力 IM 即时通讯系统-42-基于netty实现的IM服务端,提供客户端jar包,可集成自己的登录系统 IM 即时通讯系统-43-简单的仿QQ聊天安卓APP IM 即时通讯系统-44-仿QQ即…

验证二叉搜索树(力扣98)

根据二叉搜索树的特性,我们使用中序遍历,保证节点按从小到大的顺序遍历。既然要验证,就是看在中序遍历的条件下,各个节点的大小关系是否符合二叉搜索树的特性。双指针法和适合解决这个问题,一个指针指向当前节点&#…

C++泛型编程指南04-(对默认调用参数的类型推断)

文章目录 问题描述解决方案示例代码 关键点解释进一步改进:结合概念约束 你提到的情况确实是一个常见的问题:在C中,类型推断不适用于默认调用参数。这意味着如果你希望函数模板能够通过默认参数来实例化,你需要为模板参数提供一个…

windows蓝牙驱动开发-生成和发送蓝牙请求块 (BRB)

以下过程概述了配置文件驱动程序生成和发送蓝牙请求块 (BRB) 应遵循的一般流程。 BRB 是描述要执行的蓝牙操作的数据块。 生成和发送 BRB 分配 IRP。 分配BRB,请调用蓝牙驱动程序堆栈导出以供配置文件驱动程序使用的 BthAllocateBrb 函数。;初始化 BRB…

机器学习优化算法:从梯度下降到Adam及其变种

机器学习优化算法:从梯度下降到Adam及其变种 引言 最近deepseek的爆火已然说明,在机器学习领域,优化算法是模型训练的核心驱动力。无论是简单的线性回归还是复杂的深度神经网络,优化算法的选择直接影响模型的收敛速度、泛化性能…

记录一次Sqoop从MySQL导入数据到Hive问题的排查经过

个人博客地址:记录一次Sqoop从MySQL导入数据到Hive问题的排查经过 | 一张假钞的真实世界 问题描述 MySQL中原始数据有790W+的记录数,在Sqoop抽取作业成功的情况下在Hive中只有500W左右的记录数。 排查过程 数据导入脚本Log 通过Log可以发现以下信息: 该Sqoop任务被分解…