【Kotlin】Channel简介

devtools/2024/9/23 2:56:38/

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

Kotlin">fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者var iterator = channel.iterator()while (iterator.hasNext()) {var element = iterator.next()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

Kotlin">fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channelproduceactor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannelactor 函数用于构造一个消费者协程,并返回一个 SendChannel

4.1 produce

Kotlin">fun main() {var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者repeat(3) {println("send: $it")send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in receiveChannel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

Kotlin">fun main() {var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者repeat(3) {var element = receive()println("receive: $element")}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {println("send: $it")sendChannel.send(it)}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

Kotlin">fun main() {var channel = Channel<Int>(3)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}channel.close()println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {var element = channel.receive()println("receive: $element")}println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

Kotlin">fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")channel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

Kotlin">fun main() {var broadcastChannel = BroadcastChannel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

Kotlin">fun main() {var channel = Channel<Int>()var broadcastChannel = channel.broadcast(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

Kotlin">send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

http://www.ppmy.cn/devtools/27287.html

相关文章

C++中常见容器总结Array-Vector-List-Queue-Stack-Map-Set

在 C 中&#xff0c;有许多常见的容器&#xff0c;每种都有其特定的用途和性能特征。以下是一些常见的容器类型&#xff1a;1. 数组&#xff08;Array&#xff09;&#xff1a;是一组连续存储的相同类型元素的集合。数组的大小在创建时就确定&#xff0c;并且不能动态改变。2. …

泰勒创造力达到顶峰?(上)

hello,大家好&#xff01;今天看一篇经济学人的一篇评论&#xff0c;说的是泰勒斯威夫特当前的创造力。经济学人总是语不惊人死不休&#xff0c;看看它对这位音乐天才做了怎样的评价。 事先声明哈&#xff0c;本文就是一种英语学习类讲述&#xff0c;没带任何个人色彩&#xff…

服务端不 listen 可以创建 tcp 连接吗

这个问题有三类答案。 上来就撸 linux kernel 源码&#xff0c;折腾半天&#xff0c;哦&#xff0c;终于在 tcp_rcv_state_process 里找到了 tcp_rcv_synsent_state_process 调用&#xff0c;后者包含&#xff1a; if (th->syn) {/* We see SYN without ACK. It is attemp…

linux复习

与"区别 在Linux中&#xff0c;单引号&#xff08;&#xff09;和双引号&#xff08;"&#xff09;用于定义字符串&#xff0c;但它们在处理变量扩展和转义字符时有所不同。以下是具体分析&#xff1a; 1. **变量扩展**&#xff1a;在双引号内&#xff0c;变量可以…

openGauss学习笔记-272 openGauss性能调优-实际调优案例01-调整查询重写GUC参数rewrite_rule

文章目录 openGauss学习笔记-272 openGauss性能调优-实际调优案例01-调整查询重写GUC参数rewrite_rule272.1 目标列子查询提升参数intargetlist272.2 提升无agg的子查询uniquecheck openGauss学习笔记-272 openGauss性能调优-实际调优案例01-调整查询重写GUC参数rewrite_rule …

Swiper轮播图

版本&#xff1a;“swiper”: “^6.8.4”, 处理每分钟重新请求数据后&#xff0c;播放卡顿&#xff0c;快速闪&#xff0c;没按照设置时间播放等bug 以下是直接vue2 完整的组件代码 使用&#xff1a; <SwiperV :imgList“swiperList” / <template><div class"…

R-Tree: 原理及实现代码

文章目录 R-Tree: 原理及实现代码1. R-Tree 原理1.1 R-Tree 概述1.2 R-Tree 结构1.3 R-Tree 插入与查询 2. R-Tree 实现代码示例&#xff08;Python&#xff09;结语 R-Tree: 原理及实现代码 R-Tree 是一种用于管理多维空间数据的数据结构&#xff0c;常用于数据库系统和地理信…

计算机毕业设计python基于django框架的网上拍卖系统

创新点&#xff1a;本系统采用英国式拍卖和荷兰式拍卖两种模式&#xff0c;英国式拍卖也叫升价拍卖。这是最普遍的一种拍卖方式。拍卖人设定一个底价&#xff0c;竞买人相继给出更高的价格&#xff0c;最终出价最高者胜出&#xff0c;并支付最高价。这种形式在电影中非常常见&a…