【Kotlin】Channel简介

news/2024/9/23 2:56:41/

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

Kotlin">fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者var iterator = channel.iterator()while (iterator.hasNext()) {var element = iterator.next()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channelproduceactor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannelactor 函数用于构造一个消费者协程,并返回一个 SendChannel

4.1 produce

Kotlin">fun main() {var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者repeat(3) {println("send: $it")send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in receiveChannel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

Kotlin">fun main() {var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者repeat(3) {var element = receive()println("receive: $element")}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {println("send: $it")sendChannel.send(it)}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

Kotlin">fun main() {var channel = Channel<Int>(3)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}channel.close()println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {var element = channel.receive()println("receive: $element")}println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

Kotlin">fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")channel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

Kotlin">fun main() {var broadcastChannel = BroadcastChannel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

Kotlin">fun main() {var channel = Channel<Int>()var broadcastChannel = channel.broadcast(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

http://www.ppmy.cn/news/1446105.html

相关文章

Windows Server 2019/2022 开启

我的环境是Windows Serve 2022 Datacenter, 21H2, 需要使用Hyper-V 与 VMWare Workstation共存&#xff0c;但是在服务器管理器中找不到Windows Hypervisor Platform安装选项。在Win10中是启动关闭Windows功能中&#xff0c;而server中找不到。 解决方法&#xff1a; 以管理员…

机器学习理论基础—集成学习(1)

机器学习理论基础—集成学习 个体与集成 集成学习通过构建并结合多个学习器来完成学习任务&#xff0c;有时也称为多分类系统等。 分类&#xff1a; 根据集成学习中的个体学习器的不同可以分为同质集成&#xff08;集成的学习器相同例如全部是决策树&#xff09;&#xff0c…

基于SSM的“航空机票预订系统”的设计与实现(源码+数据库+文档+PPT)

基于SSM的“航空机票预订系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统首页 公告管理 用户注册 留言评论 会员管理 航班管理 订…

界面组件DevExpress Blazor UI v23.2 - 网格、工具栏功能全新升级

DevExpress Blazor UI组件使用了C#为Blazor Server和Blazor WebAssembly创建高影响力的用户体验&#xff0c;这个UI自建库提供了一套全面的原生Blazor UI组件&#xff08;包括Pivot Grid、调度程序、图表、数据编辑器和报表等&#xff09;。 DevExpress Blazor控件目前已经升级…

Linux基础——Linux开发工具(下)_make/makefile

前言&#xff1a;在经过前面两篇学习&#xff0c;大家对Linux开发工具都有一定的了解&#xff0c;而在此之前最重要的两个工具就是vim&#xff0c;gcc。 如果对这两个工具不太了解&#xff0c;可以先阅读这两篇文章&#xff1a; Linux开发工具 (vim) Linux开发工具 (gcc/g) 首先…

linux 搭建知识库文档系统 mm-wiki

目录 一、前言 二、常用的知识库文档工具 2.1 PingCode 2.2 语雀 2.3 Tettra 2.4 Zoho Wiki 2.5 Helpjuice 2.6 SlimWiki 2.7 Document360 2.8 MM-Wiki 2.9 其他工具补充 三、MM-Wiki 介绍 3.1 什么是MM-Wiki 3.2 MM-Wiki 特点 四、搭建MM-Wiki前置准备 4.1 前置…

mysql基础知识汇总

本文自行整理&#xff0c;只做学习记忆之用&#xff0c;若有不当之处请指出 一、数据库三层结构 &#xff08;1&#xff09;所谓安装Mysql数据库&#xff0c;就是在主机安装一个数据库管理系统(DBMS),这个管理程序可以管理多个数据库。DBMS(database manage system) &#xf…

Tomcat基本使用与控制台乱码解决方式

目录 Tomcat简单介绍 Tomcat基本使用 Tomcat控制台乱码解决方式 Tomcat简单介绍 tomcat是apache开源绿色版本的服务器。 Tomcat基本使用 安装&#xff1a;下载zip包解压即可。 卸载&#xff1a;删除解压的tomcat文件夹即可。 启动&#xff1a;双击bin/startup.bat 停止…