【Kotlin】Channel简介

embedded/2024/9/24 5:25:04/

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

Kotlin">fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者var iterator = channel.iterator()while (iterator.hasNext()) {var element = iterator.next()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channelproduceactor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannelactor 函数用于构造一个消费者协程,并返回一个 SendChannel

4.1 produce

Kotlin">fun main() {var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者repeat(3) {println("send: $it")send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in receiveChannel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

Kotlin">fun main() {var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者repeat(3) {var element = receive()println("receive: $element")}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {println("send: $it")sendChannel.send(it)}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

Kotlin">fun main() {var channel = Channel<Int>(3)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}channel.close()println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {var element = channel.receive()println("receive: $element")}println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

Kotlin">fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")channel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

Kotlin">fun main() {var broadcastChannel = BroadcastChannel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

Kotlin">fun main() {var channel = Channel<Int>()var broadcastChannel = channel.broadcast(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

http://www.ppmy.cn/embedded/26110.html

相关文章

CLIP论文笔记:Learning Transferable Visual Models From Natural Language Supervision

导语 会议&#xff1a;ICML 2021链接&#xff1a;https://proceedings.mlr.press/v139/radford21a/radford21a.pdf 当前的计算机视觉系统通常只能识别预先设定的对象类别&#xff0c;这限制了它们的广泛应用。为了突破这一局限&#xff0c;本文探索了一种新的学习方法&#x…

Java图搜索算法详解:探索图论中的奥秘

图搜索算法是图论领域的重要内容&#xff0c;它在解决各种实际问题中起着关键作用。本文将详细介绍几种常见的Java图搜索算法&#xff0c;包括深度优先搜索&#xff08;DFS&#xff09;、广度优先搜索&#xff08;BFS&#xff09;以及Dijkstra算法&#xff0c;帮助读者深入理解…

ES6之rest参数、扩展运算符

文章目录 前言一、rest参数二、扩展运算符 1.将数组转化为逗号分隔的参数序列2.应用总结 前言 rest参数与arguments变量相似。ES6引入rest参数代替arguments&#xff0c;获取函数实参。扩展运算符能将数组转化为参数序列。 一、rest参数 function namelist1() {console.log(ar…

Nginx从入门到精通

第一章> 1、概述 2、正反向代理 3、负载均衡 4、Nginx安装 第二章> 5、常用命令 6、实战总结 7、前端部署 ***************************************…

cartographer问题处理

问题1 : CMake Error: The following variables are used in this project, but they are set to NOTFOUND. Please set them or make sure they are set and tested correctly in the CMake files: GMOCK_LIBRARY (ADVANCED)linked by target "time_conversion_test&quo…

Android Studio的settings.gradle配置

pluginManagement { repositories { maven { url ‘https://maven.aliyun.com/nexus/content/groups/public/’} maven { url ‘https://maven.aliyun.com/nexus/content/repositories/gradle-plugin’} maven { url ‘https://maven.aliyun.com/nexus/content/repositories/ap…

Rust中的并发性:Sync 和 Send Traits

在并发的世界中&#xff0c;最常见的并发安全问题就是数据竞争&#xff0c;也就是两个线程同时对一个变量进行读写操作。但当你在 Safe Rust 中写出有数据竞争的代码时&#xff0c;编译器会直接拒绝编译。那么它是靠什么魔法做到的呢&#xff1f; 这就不得不谈 Send 和 Sync 这…

Flutter笔记:Widgets Easier组件库(3)使用按钮组件

Flutter笔记 Widgets Easier组件库&#xff08;3&#xff09;&#xff1a;使用按钮组件 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddre…