本文涉及源码基于kotlinx-coroutines-core-jvm:1.6.1
kotlin 协成系列文章:
你真的了解kotlin中协程的suspendCoroutine原理吗?
Kotlin Channel系列(一)之读懂Channel每一行源码
kotlin Flow系列之-SharedFlow源码解析
kotlin Flow系列之-StateFlow源码解析
看这边文章之前,最好对Channel和SharedFlow,StateFlow有深刻的理解,否则会有点困难。
文章目录
- 前言
- 目标
- 背压初探
- ChannelFlow
- ChannelFlowOperatorImpl
前言
不得不说Channel真是个好东西,在协成里面多个地方起到了关键性作用。Channel作为协成之间通信的一种工具,你可以给他设置缓存,缓存满了支持多种溢出策略。Channel作为一个通道,你可以往里面发送数据,也可以从里面接收数据,基于Channel的这种能力,Channel可以用在Flow中,即用来连接上游和下游,把上游的数据放入Channel中,下游从Channel中取数据,如果给Channel加一个缓冲区,Channel就可以解决上游和下游之间存在的发送和消费时间不一致产生的背压问题,同样给予这种能力,还可以在上游和下游之间切换线程(比如Channel作为中间件的时候,在指定的线程中去调用上游的collect函数收集上游数据存入Channel中,下游在指定的线程总从Channel中取数据)。
目标
本的目标是搞懂三个问题:
-
buffer操作符的原理
-
flowOn操作符的原理
-
FusibleFlow是干什么的? fuse 函数又是什么意思?
背压初探
背压(Back Pressure),指的是后端的压力,在Flow中是指上游emit(发送或者生产)数据的速度大于下游collect(接收或者消费)数据的速度,就好比家里的水龙头,如果把水龙头开关开小一点,就会形成水压。
在Flow中要形成背压需要两个条件,第一上游和下游需要工作在两个不同的线程或者协成里面;第二上游的速度大于下游下游的速度。
对于热流ShareFlow or StateFlow 他们是支持背压的,SharedFlow在创建时可以指定capacity(缓存容量) 和 溢出策略来解决背压问题。StateFlow遇到背压时就是DROP_OLDEST。
关于SharedFlow和StateFlowd的原理,包括是怎么处理背压问题的请看我博客里面其他文章,相应的链接在文章开头已列出
对于冷流来说,比如flow{}函数创建的一个冷流,当我们通过flowOn操作符让上游和下游工作在不同线程时,它里面也支持背压,flowOn函数会在上游和下游之间整一个默认大小64的缓存,上游的数据直接进入缓存中,下游从缓存里面取,当缓存满了后采用SUSPEND的溢出策略。同样也可以通过buffer操作符来解决背压问题,首先buffer操作符会让上游和下游运行在在不同的协成里面。运行在在不同协成里面如果上游的速度大于下游的速度,那就会形成背压,因此buffer操作符可以通过参数指定一个想要的缓存大小和溢出策略来处理背压问题。
buffer操作符
先创建一个冷流,不用buffer操作符。
fun main() = runBlocking{log("start------")val totalTime = measureTimeMillis {val coldFlow = flow<Int> {val time = measureTimeMillis {repeat(10){//模拟上游生产一个数据的时间需要1秒val value = run{Thread.sleep(1000)it}log("send $value")emit(it)}}log("上游发送完成耗时 $time" )}val time = measureTimeMillis {coldFlow.collect{log("receive $it")//模拟下游处理完一个数据需要2秒Thread.sleep(2000)}}log("下游接收完成耗时 $time")}log("总耗时 $totalTime")Unit
}//输出:
17:06:34:657[ main ] start------
17:06:35:692[ main ] send 0
17:06:35:699[ main ] receive 0
17:06:38:707[ main ] send 1
17:06:38:707[ main ] receive 1
17:06:41:713[ main ] send 2
17:06:41:714[ main ] receive 2
17:06:44:718[ main ] send 3
17:06:44:718[ main ] receive 3
17:06:47:726[ main ] send 4
17:06:47:726[ main ] receive 4
17:06:50:726[ main ] send 5
17:06:50:727[ main ] receive 5
17:06:53:732[ main ] send 6
17:06:53:732[ main ] receive 6
17:06:56:743[ main ] send 7
17:06:56:743[ main ] receive 7
17:06:59:752[ main ] send 8
17:06:59:753[ main ] receive 8
17:07:02:754[ main ] send 9
17:07:02:755[ main ] receive 9
17:07:04:755[ main ] 上游发送完成耗时 30068
17:07:04:757[ main ] 下游接收完成耗时 30073
17:07:04:757[ main ] 总耗时 30078
上游和下游工作在同一个协成里面(也是同一个线程),上游把数据交给下游,下游处理数据用时2000毫秒,下游处理完成后,回到上游,上游需要等待1000毫秒才能继续发送下一个数据,所以不存在背压问题。上游发送数据需要耗时30秒,下游消费完数据也需要耗时30秒,总耗时也是30秒。
接下来使用buffer操作符:
//这里相对于上面的代码指定了一个协成调度器,因为runBlocking默认的调度器会使用ThreadLocalEventLoop.eventLoop
//这样会导致使用buffer操作符后,上游把数据发送完成后,下游的协成才会开始取数据。
//Dispatchers.IO不会影响我们要测试的结果,即使你再上面代码的runBlocking中加入一个Dispathers.IO,结果也是一样的。fun main() = runBlocking(Dispatchers.IO) {log("start------")val totalTime = measureTimeMillis {val coldFlow = flow<Int> {val time = measureTimeMillis {repeat(10){//模拟上游生产一个数据的时间需要1秒val value = run{Thread.sleep(1000)it}log("send $value")emit(it)}}log("上游发送完成耗时 $time" )}val time = measureTimeMillis {//上游调用了buffer操作符。设置了缓存的容量为5。这里之所以选择5是因为本例中//上游的速度比下游快一倍,上游发送完10个数据需要10秒,下游10秒只能处理完5个数据,因此//缓存容量只要大于等于5就不会阻碍上游生产数据。感兴趣的可以去调节一下这个容量的大小,//把他改为小于5的任意非负数,看看上游发送完数据所需要的时间,肯定是大于10秒的//这里没有指定溢出策略,默认采用suspend.coldFlow.buffer(capacity = 5).collect{ log("receive $it")//模拟下游处理完一个数据需要2秒Thread.sleep(2000)}}log("下游接收完成耗时 $time")}log("总耗时 $totalTime")Unit
}//输出结果:
17:11:42:914[ DefaultDispatcher-worker-2 ] start------
17:11:43:955[ DefaultDispatcher-worker-3 ] send 0
17:11:43:964[ DefaultDispatcher-worker-2 ] receive 0
17:11:44:968[ DefaultDispatcher-worker-3 ] send 1
17:11:45:969[ DefaultDispatcher-worker-3 ] send 2
17:11:45:969[ DefaultDispatcher-worker-2 ] receive 1
17:11:46:972[ DefaultDispatcher-worker-3 ] send 3
17:11:47:971[ DefaultDispatcher-worker-2 ] receive 2
17:11:47:976[ DefaultDispatcher-worker-3 ] send 4
17:11:48:981[ DefaultDispatcher-worker-3 ] send 5
17:11:49:976[ DefaultDispatcher-worker-2 ] receive 3
17:11:49:985[ DefaultDispatcher-worker-3 ] send 6
17:11:50:989[ DefaultDispatcher-worker-3 ] send 7
17:11:51:977[ DefaultDispatcher-worker-2 ] receive 4
17:11:51:994[ DefaultDispatcher-worker-3 ] send 8
17:11:52:995[ DefaultDispatcher-worker-3 ] send 9
17:11:52:995[ DefaultDispatcher-worker-3 ] 上游发送完成耗时 10042
17:11:53:983[ DefaultDispatcher-worker-2 ] receive 5
17:11:55:986[ DefaultDispatcher-worker-2 ] receive 6
17:11:57:990[ DefaultDispatcher-worker-2 ] receive 7
17:11:59:993[ DefaultDispatcher-worker-2 ] receive 8
17:12:01:994[ DefaultDispatcher-worker-2 ] receive 9
17:12:03:999[ DefaultDispatcher-worker-2 ] 下游接收完成耗时 21059
17:12:04:000[ DefaultDispatcher-worker-2 ] 总耗时 21063
使用buffer操作符后,上游的emit和下游的collect就运行在两个不同的协成里面(由于最外层的协成调度器为Dispatchers.IO,所以两个协成会运行在不同的IO线程里面),如果上游的速度快于下游的速度就会产生背压问题,
这是怎么做到的呢?原理是什么呢?这里先简单透露一点,buffer操作符会返回一个新的Flow,这个Flow是ChannelFlow类型,buffer操作符的参数capacity就是这个Flow的缓存容量,参数onBufferOverflow没有指定,采用默认值SUSPEND。现在你可以把这个ChannelFlow看着是一个缓存大小为5,溢出策略为SUSPEND的Flow(然然它内部是采用Channel来实现的,但是表现形式上是这样的,具有缓存和溢出策略)。
所以在本例中上游flow,buffer操作符返回的ChannelFlow,下游FlowCollector这三者的关系就是,上游flow把数据发送到ChannelFlow中,下游FlowCollector从ChannelFlow中取数据。所以上游根本就不需要管下游处理数据的速度,上游只管把数据扔给ChannelFlow,就可以继续发送下一个数据,只有当ChannelFlow的缓存满了,上游才会挂起(因此本例采用的是SUSPEND溢出策略)。下游也不用关注上游发送数据的速度,下游只管从ChannelFlow的缓存中取数据。
flowOn操作符
flowOn操作符可以用来切线程,和buffer操作符一样,都是返回一个ChannelFlow的对象。buffer可以指定capacity 和溢出策略,flowOn不可以指定,而是采用默认的capacity = Channel.OPTIONAL_CHANNEL = 64,溢出策略为SUSPEND。
再来看一个flowOn的例子:
fun main() = runBlocking {log("start------")val totalTime = measureTimeMillis {val coldFlow = flow<Int> {val time = measureTimeMillis {repeat(200){//模拟上游生产一个数据的时间需要1秒val value = run{Thread.sleep(100)it}log("send $value")emit(it)}}log("上游发送完成耗时 $time" )}val time = measureTimeMillis {//让上游emit运行在IO线程里面,flowOn会返回一个capacity = 64,溢出策略为SUSPEND的ChannelFlow//上游发送了200个数据,当缓存满了就会形成背压,这里采用的策略是挂起等待。coldFlow.flowOn(Dispatchers.IO).collect{log("receive $it")//模拟下游处理完一个数据需要2秒Thread.sleep(200)}}log("下游接收完成耗时 $time")}log("总耗时 $totalTime")Unit
}
//输出:只截取了部分输出日子
.....
20:34:43:659[ main ] receive 64
20:34:43:670[ DefaultDispatcher-worker-1 ] send 126
20:34:43:773[ DefaultDispatcher-worker-1 ] send 127
20:34:43:860[ main ] receive 65
20:34:43:873[ DefaultDispatcher-worker-1 ] send 128
20:34:43:977[ DefaultDispatcher-worker-1 ] send 129
20:34:44:065[ main ] receive 66
20:34:44:078[ DefaultDispatcher-worker-1 ] send 130
20:34:44:180[ DefaultDispatcher-worker-1 ] send 131 //131-67 = 64,刚好缓存满,所以后面的输出日志
20:34:44:270[ main ] receive 67 //都是receive一个,send一个。
20:34:44:373[ DefaultDispatcher-worker-1 ] send 132
20:34:44:471[ main ] receive 68
20:34:44:576[ DefaultDispatcher-worker-1 ] send 133
20:34:44:677[ main ] receive 69
20:34:44:783[ DefaultDispatcher-worker-1 ] send 134
20:34:44:881[ main ] receive 70
20:34:44:982[ DefaultDispatcher-worker-1 ] send 135
......
20:34:58:095[ DefaultDispatcher-worker-1 ] 上游发送完成耗时 27537
....
20:35:11:269[ main ] 下游接收完成耗时 40732
20:35:11:269[ main ] 总耗时 40735
那如果你说,我想实现发送200个数据不让上游挂起等待,有没有办法呢?上游速度比下游快一倍,200个数据,只要缓存大于100就能实现,所以只要让ChannelFlow的capacity > 100即可,如何实现呢?通过buffer操作符,指定其capacity > 100。
//其他代码不变,buffer和flowOn的顺序没有影响,在前在后都一样,flowOn和buffer两个操作符最终只会生成
//一个capacity = 100,溢出策略为suspend的ChannelFlow,flowOn的时候会生成一个ChannelFlow,buffer的时候会把
//这个ChannelFlow 结合buffer里面的参数融合形成一个新的ChannelFlow。ChannelFlow 是一个FusibleFlow,实现了
//fuse函数,因此是可熔的。这里现提出这个概念,后面会详细讲,不懂的暂时可以不用理会。
coldFlow.flowOn(Dispatchers.IO).buffer(100).collect{xxx
}//输出结果:
.....
20:54:06:851[ DefaultDispatcher-worker-1 ] send 195
20:54:06:955[ DefaultDispatcher-worker-1 ] send 196
20:54:07:042[ main ] receive 100
20:54:07:055[ DefaultDispatcher-worker-1 ] send 197
20:54:07:158[ DefaultDispatcher-worker-1 ] send 198
20:54:07:247[ main ] receive 101
20:54:07:263[ DefaultDispatcher-worker-1 ] send 199
20:54:07:264[ DefaultDispatcher-worker-1 ] 上游发送完成耗时 20587 //比之前少了7秒
20:54:07:447[ main ] receive 102
20:54:07:649[ main ] receive 103
....
20:54:27:319[ main ] 下游接收完成耗时 40666
20:54:27:320[ main ] 总耗时 40670
从输出日志可以看出,直到上游全部发送完成,都没有出现上游需要挂起等待的现象。上游发送200个数据,每个数据耗时100毫秒,刚好总耗时20秒。如果你觉得缓存整大了,浪费内存,又不想让上游挂起等待,可以在buffer中指定一个小的缓存,指定另外两种溢出策略DROP_OLDEST或者DROP_LATEST。
再来看一个热流SharedFlow吧,热流是不需要切换线程的,emit和collect两个函数运行在什么线程上游和下游就运行在什么线程里面。就算你想切换也不行,热流调用flowOn会抛异常。
fun main() = runBlocking (Dispatchers.IO){log("start------")//内部缓存大小2,溢出策略为DROP_OLDESTval hotFlow = MutableSharedFlow<Int>(2, onBufferOverflow = BufferOverflow.DROP_OLDEST)launch {hotFlow.collect{log("receive $it")Thread.sleep(2000)}}launch {val time = measureTimeMillis {repeat(10){val value = run{Thread.sleep(1000)it}log("send $it")hotFlow.emit(value)}}log("上游发送完成耗时 $time")}Unit
}
//输出结果:
21:41:26:162[ DefaultDispatcher-worker-1 ] start------
21:41:27:199[ DefaultDispatcher-worker-3 ] send 0
21:41:27:202[ DefaultDispatcher-worker-1 ] receive 0
21:41:28:204[ DefaultDispatcher-worker-3 ] send 1
21:41:29:203[ DefaultDispatcher-worker-1 ] receive 1
21:41:29:206[ DefaultDispatcher-worker-3 ] send 2
21:41:30:208[ DefaultDispatcher-worker-3 ] send 3
21:41:31:203[ DefaultDispatcher-worker-1 ] receive 2
21:41:31:212[ DefaultDispatcher-worker-3 ] send 4
21:41:32:216[ DefaultDispatcher-worker-3 ] send 5
21:41:33:203[ DefaultDispatcher-worker-1 ] receive 4
21:41:33:219[ DefaultDispatcher-worker-3 ] send 6
21:41:34:221[ DefaultDispatcher-worker-3 ] send 7
21:41:35:208[ DefaultDispatcher-worker-1 ] receive 6
21:41:35:221[ DefaultDispatcher-worker-3 ] send 8
21:41:36:226[ DefaultDispatcher-worker-3 ] send 9
21:41:36:227[ DefaultDispatcher-worker-3 ] 上游发送完成耗时 10029
21:41:37:211[ DefaultDispatcher-worker-1 ] receive 8
21:41:39:215[ DefaultDispatcher-worker-1 ] receive 9//数据3,5,7丢失了
热流自带支持背压,从上面的输出日志可以查看出,当缓存满了后,新发送的数据会把缓存里面的数据替换掉,因此对于下游来说就会存在某些数据接收不到,上游发送完成所有数据耗时10秒。假如有这样一个需求,hotFlow是第三方库返回给你的,你也不能去改变上游的代码,但是你作为下游消费数据的速度比上游慢,又不想错过某些数据,那怎么办呢?答案是同样是一天buffer操作符:
//其他代码不变
launch {//这里为什么是2,难道不是应该3吗?因为ChannelFlow里面是用Channel来实现缓存的,//Channel缓存满了后,如果溢出策略是suspend,那么queue中还可以挂起Send对象,即实际上//可以缓存3个,hotFlow自带2个缓存,加起来就是5个,缓存5个实际上最多可以支持发送11个数据,超过11个数据//就会丢失数据//如果不懂Channel的缓存原理,可以查看我关于Channel的文章hotFlow.buffer(2).collect{log("receive $it")Thread.sleep(2000)}}
//输出日志:
23:07:15:045[ DefaultDispatcher-worker-1 ] start------
23:07:16:089[ DefaultDispatcher-worker-2 ] send 0
23:07:16:092[ DefaultDispatcher-worker-3 ] receive 0
23:07:17:094[ DefaultDispatcher-worker-2 ] send 1
23:07:18:094[ DefaultDispatcher-worker-2 ] send 2
23:07:18:094[ DefaultDispatcher-worker-3 ] receive 1
23:07:19:096[ DefaultDispatcher-worker-2 ] send 3
23:07:20:096[ DefaultDispatcher-worker-3 ] receive 2
23:07:20:097[ DefaultDispatcher-worker-2 ] send 4
23:07:21:101[ DefaultDispatcher-worker-2 ] send 5
23:07:22:096[ DefaultDispatcher-worker-3 ] receive 3
23:07:22:101[ DefaultDispatcher-worker-2 ] send 6
23:07:23:104[ DefaultDispatcher-worker-2 ] send 7
23:07:24:098[ DefaultDispatcher-worker-3 ] receive 4
23:07:24:108[ DefaultDispatcher-worker-2 ] send 8
23:07:25:108[ DefaultDispatcher-worker-2 ] send 9
23:07:25:109[ DefaultDispatcher-worker-2 ] 上游发送完成耗时 10020
23:07:26:099[ DefaultDispatcher-worker-3 ] receive 5
23:07:28:102[ DefaultDispatcher-worker-3 ] receive 6
23:07:30:106[ DefaultDispatcher-worker-3 ] receive 7
23:07:32:107[ DefaultDispatcher-worker-3 ] receive 8
23:07:34:108[ DefaultDispatcher-worker-3 ] receive 9
//上游发送的数据,下游全部接收,一个数据没丢
时间/秒 | 发送数据 | hotFlow自己的缓存 | ChannelFlow的缓存 | 消费数据 |
---|---|---|---|---|
0 | 0 | 0 | ||
1 | 1 | 1 | ||
2 | 2 | 2 | 1 | |
3 | 3 | 3,2 | ||
4 | 4 | 4,3 | 2 | |
5 | 5 | Send(5),4,3 | ||
6 | 6 | Send(6),5,4, | 3 | |
7 | 7 | 7 | Send(6),5,4, | |
8 | 8 | 8 | Send(7),6,5 | 4 |
9 | 9 | 9,8 | Send(7),6,5 | |
10 | 10 | 10,9 | Send(8),7,6 | 5 |
11 | 11 | 11,10 (数据9丢失) | Send(8),7,6 | |
12 | 12 | 12,11 | Send(10),8,7 | 6 |
13 | 13 | 13,12(数据11丢失) | Send(10),8,7 | |
14 | 14 | 14,13 | Send(12),10.8 | 7 |
15 | ||||
16 | 14 | Send(13),12,10 | 8 | |
18 | Send(14),13,12 | 10 | ||
20 | 14,13 | 12 | ||
22 | 14 | 13 | ||
24 | 14 |
这个表中记录在某个时间点,上游发送数据,缓存,下游消费数据的一个情况,Send代表了这个数据被包装成了一个Send对象被挂起在Channel的queue中。当数据超过11个时就会开始出现丢失数据。如上表所示,如果发送数据为14个,那么数据9和数据11会丢失。不信的话修改一下代码,打印一下日志:
fun main() = runBlocking (Dispatchers.IO){log("start------")val hotFlow = MutableSharedFlow<Int>(2, onBufferOverflow = BufferOverflow.DROP_OLDEST)launch {hotFlow.buffer(2, onBufferOverflow = BufferOverflow.SUSPEND).collect{log("receive $it")Thread.sleep(2000)}}launch {val time = measureTimeMillis {repeat(15){val value = run{Thread.sleep(1000)it}log("send $it")hotFlow.emit(value)}}log("上游发送完成耗时 $time")}Unit
}
//输出结果:
23:11:42:753[ DefaultDispatcher-worker-1 ] start------
23:11:43:804[ DefaultDispatcher-worker-2 ] send 0
23:11:43:807[ DefaultDispatcher-worker-3 ] receive 0
23:11:44:805[ DefaultDispatcher-worker-2 ] send 1
23:11:45:811[ DefaultDispatcher-worker-3 ] receive 1
23:11:45:811[ DefaultDispatcher-worker-2 ] send 2
23:11:46:811[ DefaultDispatcher-worker-2 ] send 3
23:11:47:812[ DefaultDispatcher-worker-2 ] send 4
23:11:47:812[ DefaultDispatcher-worker-3 ] receive 2
23:11:48:812[ DefaultDispatcher-worker-2 ] send 5
23:11:49:813[ DefaultDispatcher-worker-2 ] send 6
23:11:49:813[ DefaultDispatcher-worker-3 ] receive 3
23:11:50:817[ DefaultDispatcher-worker-2 ] send 7
23:11:51:813[ DefaultDispatcher-worker-3 ] receive 4
23:11:51:817[ DefaultDispatcher-worker-2 ] send 8
23:11:52:817[ DefaultDispatcher-worker-2 ] send 9
23:11:53:813[ DefaultDispatcher-worker-3 ] receive 5
23:11:53:821[ DefaultDispatcher-worker-2 ] send 10
23:11:54:822[ DefaultDispatcher-worker-2 ] send 11
23:11:55:818[ DefaultDispatcher-worker-3 ] receive 6
23:11:55:822[ DefaultDispatcher-worker-2 ] send 12
23:11:56:823[ DefaultDispatcher-worker-2 ] send 13
23:11:57:819[ DefaultDispatcher-worker-3 ] receive 7
23:11:57:827[ DefaultDispatcher-worker-2 ] send 14
23:11:57:827[ DefaultDispatcher-worker-2 ] 上游发送完成耗时 15027
23:11:59:824[ DefaultDispatcher-worker-3 ] receive 8
23:12:01:824[ DefaultDispatcher-worker-3 ] receive 10
23:12:03:829[ DefaultDispatcher-worker-3 ] receive 12
23:12:05:830[ DefaultDispatcher-worker-3 ] receive 13
23:12:07:833[ DefaultDispatcher-worker-3 ] receive 14
ChannelFlow
不管是上面的buffer还是flowOn操作符都会返回一个ChannelFlow对象,ChannelFlow是一个啥,到底是一个Channel还是一个Flow呢?从结构上来看ChannelFlow实现了FusibleFlow,FusibleFlow继承了Flow接口,所以ChannelFlow严格意义上来讲它是一个Flow。那为什么又叫ChannelFlow呢?Channel在那里呢?实际上在其内部会创建一个Channel,Channel支持缓存,同时也是协成之间通信的工具。当上游和下游工作在不同协成时,Channel就刚好满足这个需求。既然需要的是Channel,那直接搞一个Channel不行吗?非得整ChannelFlow。那是因为下游消费数据时需要调用Flow的collect函数,Channel是没有这个函数的,所以这就是为什么要整一个实现了Flow接口的ChannelFlow。所以实际上下游调用的是ChannelFlow的collect函数,下游取数据是从ChannelFlow中取。
ChannelFlow是一个抽象类,它有很多个子类:
不同的子类适合不同的场景,比如Flow调用buffer
or flowOn
操作符时会用到ChannelFlowOperatorImpl
;通过channelFlow
创建一个冷流Flow时会用到ChannelFlowBuilder
,通过callbackFlow
创建一个冷流Flow会用到CallbackFlowBuilder
;把一个Channel转换(通过receiveAsFlow or consumeAsFlow)成一个热流Flow时会用到ChannelAsFlow
;通过merge
合并多个流时会用到ChannelLimitedFlowMerge
;flattenMerge
会用到ChannelFlowMerge
;调用transformLatest
会用到ChannelFlowTransformLatest
。
ChannelFlow它实际上相当于一个空壳,或者说是一个代理,他是上游和下游之间的一个代理。内部也是采用Channel来实现的。你可以把当做一个可以支持缓存的流来看待,它既可以冷流也可以是热流,这取决于它的上游是冷还是热,如果上游是热流它就是热流,上游是冷流它就是冷流。
ChannelFlow有三个参数:
- context:CoroutineContext,在需要的时候会开启一个协成,让这个协成运行在这个context环境里面,这个协成做的事情就是去collect上游的数据,所以调用flowOn操作符传一个Dispatchers,就会让上游(冷流)运行在指定的线程里面了。对于非flowOn这种情况,context为EmptyCoroutineContext,这种情况开启的协成运行的context为下游所在协成的context(不如bufferc操作符)。
- capacity:ChannelFlow的缓存容量,实际上ChannelFlow是没有缓存的,他的缓存是靠创建一个Channel来实现的,因此这个值最最终代表了在需要时创建的Channel的缓存容量。对于外部使用者来说,你不用关心ChannelFlow内部到底是自己实现了一套缓存系统,还是用的Channel。这都不重要,你就把ChannelFlow当做一个具有缓存功能的流来用就行。capacity就是它的缓存容量。
- onBufferOverflow:缓存溢出策略,这个就比较好理解了,意思就是Channe缓存满了后,如何处理,
public abstract class ChannelFlow<T>(/***协成运行的context*/@JvmField public val context: CoroutineContext,// 创建Channel的buffer容量@JvmField public val capacity: Int,//Channel Buffer满了后的移除策略@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {init {//如果你想使用CONFLATED这种模式,那就需要脱糖(传capacity = 0 ,onBufferOverflow = DROP_OLDEST)//这样会创建一个capacity = 1,DROP_OLDEST的ArrayChannlassert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to 0, DROP_OLDEST by callers}/*** 返回一个suspned 函数。这个suspend函数将要作为开启协成的协成体,意思就是开启一个协成后,协成* 就会执行这个函数,* 在这个函数里面调用了调用collectTo函数,collectTo在子类中实现,* 不同的子类有不同的行为,但是他们都干了类似的事情,那就是去取上游的数据。** 比如:ChannelFlowOperator里面就会去调用真正上游Flow的collect函数,把从上游取到的数据发送到Channel* 里面*///参数ProducerScope是一个Channel类型,是就是在下面produce函数中创建的Channel的一个代理。//暂且就把它看着是produce中创建的Channel。internal val collectToFun: suspend (ProducerScope<T>) -> Unit//这里面有调用了collectTo函数。//it 就是参数ProducerScopeget() = { collectTo(it) }//最终创建Channel的capacity.internal val produceCapacity: Int//Channel.BUFFERED会创建一个缓存为默认64的Channel。get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity/*** 脱离ChannelFlow,啥意思呢?在前面背压和切换线程时,都是用ChannelFlow连接了上游和下游,* 对于下游来说,下游的上游就变成了ChannelFlow,调用该函数,就是脱掉这个ChannelFlow,返回原始上游对象。* 比如在ChannelFlowOperatorImp里面从写了该函数,直接返回了原始上游flow对象。* 比如在ChannelFlowBuilder里面就没有从写该函数,那就返回一个null。因为ChannelFlowBuilder他没有上游对象* 比如在ChannelAsFlow里面,依然返回了一个ChannelAsFlow,因为对于ChannelAsFlow来说它的上游* 实际上是一个Channel对象,但是这里返回的类型是Flow,那肯定能返回一个Channel把,于是重新返回了一个* 新的ChannelAsFlow,个ChannelAsFlow就是对Channel的一个包装,把Channel包装成了一个Flow。* ChannelAsFlow只是做了一个简单的代理。** */public open fun dropChannelOperators(): Flow<T>? = null/*** fuse函数是FusibbleFlow接口里面定义的,一个实现了FusibleFlow接的的Flow就代表了这个Flow是可熔的。* 这么说有点抽象,就好比金属可熔的。把黄金熔化后就可以给其重新塑性,变成另一种形状,但是本质上还是黄金。* 只是形状变了。* fuse在计算机领域有融合的意思,对于一个实现了FusibleFlow的Flow来说,表明了这个Flow是可溶的,具体怎么熔* 取决于其fuse函数如何实现。比如有的Flow就是把自己熔了,形成一个新的Flow,自己消失了。还有的Flow是把自己融入* 到一个新的Flow,但是自己不会消失,自己充当新的Flow的上游。** 这里的ChannelFlow的fuse实现就是把自己(this)熔了,结合自己加上新指定的参数融合成一个新的ChannelFlow。* 比如说现在有一块黄金(this)重200克,现在再给你100克黄金,把两个熔化就会形成一个新的黄金300克。* 当然ChannelFlow的fuse函数不仅仅是简单把this的三个参数(context,capacity,onBfferOverFlow)和 fuse* 函数的三个参数相加这么简单。不同的场景融合方式不一样,具体看fuse怎么实现的。* * fuse函数是在flowOn 或者buffer 操作符里面被调用,上游调用flowOn或者buffer会产生一个新的下游* (ChannelFlow)。如果是buffer函数调用,buffer操作符的目的就是为了在上游和下游之间增加一个缓冲区* 这个缓冲区用来接收上游的数据,供下游消费,所以buffer操作符导致产生的ChannelFlow就需要具备缓存的能力。* 那这个ChannelFlow的缓存容量是多少呢?溢出策略是什么呢?是buffer中参数指定的吗?我这里要说的是不一定。* ChannelFlow最终的缓存容量和溢出策略跟它的上游有关系,要看它的上游是不是一个FusibleFlow类型。* 如果它的上游不是FusibleFlow类型,那ChannelFlow的缓存容量和溢出策略就是buffer参数指定的。* 如果它的上游是FusibleFlow类型,说明上游是可溶的,那就看上游的的fuse函数怎么实现了。*** 比如说,上游是一个通过flow函数创建的一个冷流,它不是FusiableFlow类型,因此不具备可溶的能力,如果调用* buffer时传的capacity = 6,溢出策略为SUSPNED。那么创建的这个ChannelFlow就的缓存容量就是6溢出策略为* SUSPEND。(内部是通过Channel来实现缓存和溢出策略的)。** 比如说还是用一个flow函数创建一个冷流,但是连续调用两次buffer,第一次buffer(2,SUSPNED),第二次* buffer(3,SUSPEND),第一次调用产生了一个ChannelFlow,capacity = 2,溢出策略为SUSPEND。第二次* 调用buffer时的接收者是第一次生成的ChannelFlow也就是说第二次buffer的上游就是一个FusibleFlow,那么就会调用* 第一次产生的ChannelFlow的fuse函数把第一个ChannelFlow熔化,形成一个新的的ChannelFlow。* 新的ChannelFlow的capacity = 2 + 3 。溢出策略为SUSPEN。在整个调用链里面,第一个ChannelFlow就会消,* 再也不会使用到它。也就是说虽然调用了两次buffer。但是整个调用链里面只有一个ChannelFlow。** 在比如说,一个热流SharedFlow or StateFlow,虽然他们都是一个FusibleFlow类型,但是他们的fuse函数里面* 只会创建一个新的ChannelFlow,不会把自己给熔化形成一个新的,而是把自己融入这个新的ChannelFlow里面,充当* 这个新的ChannelFlow的上游。** 总的来说就是不同的FusibleFlow有自己不同的fuse实现方式。* * 这里我们就来看看ChannelFlow的fuse函数是怎么把自己熔化形成一个新的。* context: flowOn函数调用会传一个具体的context,buffer调用是一个默认EmptyCoroutineContext* capacity : flowOn函数调采用默认值Channel.OPTIONAL_CHANNEL,buffer调用就自己指定了* onBufferOverflow:flowOn函数调用采用的默认值SUSPEND,buffer调用就自己指定*/public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {//capaciyt 不能是CONFLATE,需要脱糖assert { capacity != Channel.CONFLATED } //用来创建ChannelFlow的context,注意这里两个context的前后循序。//两个context相加,如果前一个context和后面context里面有相同的Element,以后面的为准。//比如说,在flowOn线程切换时,多次调用flowOn,第一次为Dispatchers.IO,第二次为Dispatchers.Main。//就相当于Dispatchers.Main + Dispatchers.IO = Dispatchers.IO。val newContext = context + this.context//用于计算创建ChannelFlow的capaticy参数val newCapacity: Int//用于计算创建ChannelFlow的溢出策略val newOverflow: BufferOverflow//下面开始根据各种不同的情况计算出newCapacity 和 newOverflow的最终结果。/*** 如果溢出策略不是SUSPEND(即是DROP_OLDEST | DROP_LATEST时),那创建的ChanelFlow的capacity和溢出策略* 就不需用融合之前的(上一个ChannelFlow)capacity和溢出策略。比如说 * flow.buffer(2,suspend).buffer(3.dorp_oldest),* 最终会生成一个capacity = 3,溢出策略为drop_oldest的ChannenelFlow。* 为什么是这样呢?我们把上面的嗲用分开来看:* 第一步:val f1 = flow.buffer(2,suspend)* 第二步:val f2 = f1.buffer(3,drop_oldest).* 假如说f1,是一个第三方库返回给你,你是不知道f1的具体细节,对于最下游来说,我现在就想永远接收到* 上游(f1)最新的三个数据。因此对于f1的fuse来说,融合形成f2,如果把f2容量整成了* f1.capacity + f2.capacity = 5,那下游就会收到来自上游最新的5个数据,就会让下游开发者很懵逼,* 明明我写的是buffer(3,drop_oldest),现在却收到了最新的五个数据。**/if (onBufferOverflow != BufferOverflow.SUSPEND) {//如果不是SUSPEND,生成的新的ChannelFlow不用考虑前一个ChannelFlow的capacity和onBufferOverflownewCapacity = capacitynewOverflow = onBufferOverflow} else {/*** 如果是suspend。那么上游在融合自己形成新的ChannelFlow时就需要考虑自己的capacity,* 但是新的ChnanelFlow的溢出策略以上游的ChannelFlow的溢出策略为准。* 这是为什么呢?你这样想,如果一个第三方库返回一个 val f =flow.buufer(3,drop_xxx)。给你* 你拿到f后,想给它再加一个更大的缓冲区,于是 val f2 = f.buffer(10,suspend)。* 这个时候,对于第三方库来说,我是一个缓存为3的,溢出策略为drop_xx的流,对于你来说* 你拿到第三方库返回给你的流,你能做的事情是在其下面在增加一个缓冲区来增加吞吐量,你是改变* 不了第三方库中流的溢出策略的。所以这就是为什么要这样。* 这里,f2返回的就是一个 capacity = 10 + 3,溢出策略为drop_xx的ChannelFlow*/newCapacity = when {/*** Channel.OPTIONAL_CHANNE的可见性是Internal,开发者不能直接调用。所以当* this.capacity == Channel.OPTIONAL_CHANNE 也就说明我们在需要一个ChannelFlow* 的时候是不能给他指定缓存容量的(Channel.OPTIONAL_CHANNE默认缓存容量为64),这时候* 调用fuse函数的时候给他指定了一个容量,那把之前的ChannelFlow熔了,形成新的ChannelFlow的* 的capacity就以指定的容量为准。* 这就好比你第一次去买一个盆,你说老板给我来个盆(没有明确要求大小),老板就随便给你了一个盆*(大小64升),你回家用的时候发现你需要一个10L或者100L的,那就跑回去重新要求老板给你一个具体* 容量的盆。* * 在我们调用flowOn 或者 把一个Channel转成Flow 时,返回的ChannelFlow的capacity就是* Channel.OPTIONAL_CHANNEL */this.capacity == Channel.OPTIONAL_CHANNEL -> capacity/*** capacity == Channel.OPTIONAL_CHANNEL代表了这一次融合没有明确要求缓存,* 比如是通过flowOn调用产生的融合,因此只需要考虑切线程的问题,那么新的ChannelFlow的* capacity就以之前的准。* * 比如说:flow.buffer(2,suspend).flowOn(Dispatchers.I0)* 第一步:flow.buffer的时候会创建一个capacity = 2 的ChannelFlow。* 第二步:ChannelFlow.flowOn时,只传了第一个参数,capacity 和 onBufferOverflow都是* 默认值,capacity 默认值为Channel.OPTIONAL_CHANNEL,onBufferOverflow为SUSPEND.* * 在第二步flowOn创建ChannelFlow就是融合了第一步buffer的capacity参数和第二步的context参数* 形成的一个新的ChannelFlow,这个新的ChannelFlow具备了buffer时生成的那个ChannelFlow的能力* (capacity = 2),同时也有flowOn时切换线程的能力。这就叫融合(fuse)。* 因此flowOn生成的ChannelFlow就取代了之前buffer时生成的ChannelFlow直接和最上游进行对接。*/capacity == Channel.OPTIONAL_CHANNEL -> this.capacity/*** Channel.BUFFERED和 Channel.OPTIONAL_CHANNEL一样,因为Channel.OPTIONAL_CHANNEL最终* 使用的也是Channel.BUFFERED。在创建Channel时,如果capacity = Channel.BUFFERED,那就* 会使用默认值64。比如flow.buffer() 什么都不传,capacity的默认值就是Channel.BUFFERED。* 比如channelFlow{} 函数返回的ChannelFlow的capacity的默认值也是Channel.BUFFERED。*** 比如:flow.buffer().buffer(2,suspend)。第一个buffer调用时没有指定capacity就是用* 默认值 Channel.BUFFERED,意思就是只想要一个缓冲区,但是没有明确指明大小(虽然默认为64)。* 第二次buffer的时候明确指明了需要一个大小为2的缓冲区,基于这种情况,那就把第一个capacity = 64* 的ChannelFlow熔成一个capacity = 2的ChannelFlow,这个capacity=2的这个ChannelFlow直接* 和最上游flow对接,原来capacity = 64的这个就丢弃了。既然第一次没有指明大小,那最终就用* 指明大小的为准。** 再比如说通过channelFow函数创建了一个冷流,这个冷流就是一个ChannelFlow类型,它的capacity = * Channel.BUFFERED(默认64):* val coldFlow = channelFlow<Int>{}* 直接对其collect,ChannelFlow内部就会创建一个缓存大小为64的Channel。* coldFlow.collect{}* 如果改成:coldFlow.buffer(4,suspend),这时候就会创建一个capacity = 4的ChannelFlow来* 取代之前capacity = Channel.BUFFERED的ChannelFlow。对这个capacity= 4的ChannelFlow* 进行collect时,就会去创建一个缓存大小为4的Channel。** */this.capacity == Channel.BUFFERED -> capacity/*** 比如说flow.buffer(2,suspend).buffer(Channel.BUFFERED,supend)* 第一次buffer时创建一个capacity = 2 的ChannelFlow,第二次仍然创建一个capacity = 2* 的ChannelFlow,第二次的ChannelFlow取代第一次的ChannelFlow。相当说把第一次的熔了,生成* 了一个新的,只是说这个新的capacity 和第一次的一样,*/capacity == Channel.BUFFERED -> this.capacity/*** 最后一种情况,比如:flow.buffer(3,onBufferOverFlow).buffer(3,suspend).* 两次调用buffer都指定了明确的capacity。那么第一个buffer生成的ChannelFlow熔于第二个* buffer生成的ChannelFlow。第二个buffer生成的ChannelFlow的capacity 为两次buffer之和。* 因为第一个生成的ChannelFlow要消失掉,所以第二个buffer生成的ChannelFow的capacity需要加上* 第一个buffer时的capacity。* */else -> {// sanity checksassert { this.capacity >= 0 }assert { capacity >= 0 }// 把两个相加val sum = this.capacity + capacity//如果相加超过了Int最大值,sum就是负数,这种情况返回一个UNLIMITED,代表了无限缓存if (sum >= 0) sum else Channel.UNLIMITED }}/**** 也就是说在本次调用fuse溢出策略为suspend时,上游ChannelFlow熔于后形成的新的ChannelFlow* capacity可能会改变,比如变成两次之和,但是溢出策略以上游的溢出策略为准。*/newOverflow = this.onBufferOverflow}//计算出将要新生成的ChanneFlow的几个参数都和之前的上游(ChannelFlow)的参数相等,//那就不用再生产一个新的,直接用之前的就行。因为我们调用buffer or flowOn都是为了把流的行为改变,//结果你改变后还是和之前一样,那还有必要再转变吗?if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)return this//只有发现要改变后的Flow和之前的是不一样的,才会创建新的去替代之前的。return create(newContext, newCapacity, newOverflow)/*** note :ChannelFlow的融合(fuse)规则还是有点不好理解的,特别在一个调用链里面多次调用buffer or flowOn* 等操作符,如果不看源码你自己都不知道最终生成的这个ChannelFlow的capacity 和 溢出策略是什么。* 所以在使用过程中,尽可能的避免这次重复多次调用。*///从这个fuse的实现可以看出,只要调用的是ChannelFlow的buffer or flowOn,都会把这个ChannelFlow熔了生产//一个新的ChannelFlow,这个新的ChannelFlow的context,capacity,onBufferOverflow 是由上一个ChannelFlow//和本次调用fuse函数传进来的context,capacity,onBufferOverflow融合形成的。新的ChannelFlow取代原来的//ChannelFlow。保证最终这个Flow的调用连里面只有一个ChannelFlow对象存在。}//在fuse函数中调用,创建一个ChannelFlow子类的实例对象,子类中实现,只有子类自己才知道如何创建自己protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T>/*** 在其子类中实现。不同的子类有不同实现方式,总的来说就是开一个协成从上游拿数据。* 比如调用coldFlow.buffer(5,suspend).collect{} * 当下游collect ChannelFlow的时候,ChannelFlow就会开启一个协成,在协成里里面回去调用collectTo函数* 在子类的collectTo里面会去collect最上游,把从最上游中取到的数据emit到ChannelFlow内部创建* 的Channell里面。最下游collect ChannelFlow时就去Channel中receive数据,把接收到的数据交给最下游。***/protected abstract suspend fun collectTo(scope: ProducerScope<T>)/** * 该函数就是去创建一个Channel对象,Channel的容量就为produceCapacity,溢出策略为onBufferOverflow,* 这两个值就是创建ChannelFlow时构造函数传入的。* * produce函数里面还会用collectToFun这个suspend函数开启一个协成,这个协成运行的context * 就是创建ChannelFlow 构造函数传入的context。协成启动后会执行collectToFun,* 在collectToFun里面取就会去调用collectTo函数,** */public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)//提供给下游调用ChannelFlow的 collect函数,有的子类会重写该函数,//这里的默认实现是也是开启一个协成,coroutineScope 内部也会开启一个协成去执行//coroutineScopel里面的代码:collector.emitAll(produceImpl(this)),//coroutineScope开启的子协成发生异常时会传播到父协成。//coroutineScope开启的子协成和其父协成运行在同一个线程里面。这样才能保证//最下游收到数据时的线程时和最下游调用cellect的线程是同一个线程。override suspend fun collect(collector: FlowCollector<T>): Unit =//因为在produceImpl 函数中调用produce函数,所以需要拿到一个CoroutineScope对象//如果只是单纯的要一个CoroutineScope,那么直接用GlobalScope也行。//那为什么不用GlobalScope呢?因为在produce函数要开启一个协成,这个协成就是要去collect上游的.//如果是用GlobalScope那就相当于开启的协成是一个独立的协成。这就回存在一个问题,如果下游collect所在的协成//被取消了,在produce里面开启的协成不会取消,也就是说上游还可以继续emit。因此在produce里面//所开启的的协成必须是下游collect所在协成的子协成。所以用到了coroutineScope。//coroutineScope能拿到外部的Scope对象。coroutineScope {collector.emitAll(produceImpl(this))}protected open fun additionalToStringProps(): String? = null// debug toStringoverride fun toString(): String {val props = ArrayList<String>(4)additionalToStringProps()?.let { props.add(it) }if (context !== EmptyCoroutineContext) props.add("context=$context")if (capacity != Channel.OPTIONAL_CHANNEL) props.add("capacity=$capacity")if (onBufferOverflow != BufferOverflow.SUSPEND) props.add("onBufferOverflow=$onBufferOverflow")return "$classSimpleName[${props.joinToString(", ")}]"}
}
我相信对于很多第一次接触ChannelFlow的朋友看到这里时肯定会云里雾里的,这源码的解释都说的是一些啥,根本就看不懂。这很正常
因为现在只是单独在解释ChannelFlow的源码,是有点抽象,这里只需要大家提前对ChannelFlow有一个大概的认知就行,接下里会以实际的子类类说明它到底是如何工作的。建议想彻底搞明白的朋友们可以写个demo,对照着多看几遍,仔细体会。相信必有收获。
ChannelFlowOperatorImpl
调用buffer 和 flowOn操作符都会生成一个ChannelFlowOperatorImpl。ChannelFlowOperatorImpl 继承至ChannelFlowOperator,ChannelFlowOperator继承至ChannelFlow。
同样我们用之前初探里面的例子为例:
fun main() = runBlocking(Dispatchers.IO) {log("start------")val totalTime = measureTimeMillis {val coldFlow = flow<Int> {val time = measureTimeMillis {repeat(10){//模拟上游生产一个数据的时间需要1秒val value = run{Thread.sleep(1000)it}log("send $value")emit(it)}}log("上游发送完成耗时 $time" )}val time = measureTimeMillis {coldFlow.buffer(capacity = 5).collect{ log("receive $it")//模拟下游处理完一个数据需要2秒Thread.sleep(2000)}}log("下游接收完成耗时 $time")}log("总耗时 $totalTime")Unit
}
coldFlow调用了buffer操作符生成了一个新流ChannelFlow,这个ChannelFlow具有缓存功能.
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {//capaciy只能是非负数,或者BUFFERED 或者 CONFLATED, 不能是其他的,比如 UNLIMITED //RENDEZVOUS = 0是可以的require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"}//如果capacity 为CONFLATED,那么onBufferOverflow只能是SUSPEND.//因为创建一个Channel时,如果capacity = CONFLATED,那这个Channel的溢出策略只能是DROP_OLDEST,//所以这里的意思就是让你不要传onBufferOverflow,使用默认值SUSPENDrequire(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {"CONFLATED capacity cannot be used with non-default onBufferOverflow"}// desugar CONFLATED capacity to (0, DROP_OLDEST)var capacity = capacityvar onBufferOverflow = onBufferOverflow//这里对capacity = CONFLATED 进行脱糖处理,让capacity = 0,溢出策略为DROP_OLDEST。//这样在1.6.1的版本里面就会创建一个capacity = 1,溢出策略为DROP_OLDEST的ArrayChannel。//在1.7.1版本里面就会创建一个capacity = 1,溢出策略为DROP_OLDEST的ConflatedBufferedChannelif (capacity == CONFLATED) {capacity = 0onBufferOverflow = BufferOverflow.DROP_OLDEST}// 创建一个ChannelFlow返回出去return when (this) {//如果上游是一个FusibleFlow,那就调用其fuse函数创建一个is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)//如果不是fusibleFlow类型,那就直接new 一个ChannelFlowOperatorImplelse -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)}
}
buffer操作符里面,最终返回了个ChannelFlow。在本例中,上游是通过flow{}函数创建的一个冷流,不是FusibleFlow类型,因此会直接new 一个ChannelFlowOperatorImpl。
ChannelFlowOperatorImpl源码
internal class ChannelFlowOperatorImpl<T>(flow: Flow<T>,//上游的flow,在本例中就是通过flow{}创建的那个coldFlow//协成运行的context。在本例子没有涉及到切换线程,为EmptyCoroutineContextcontext: CoroutineContext = EmptyCoroutineContext,//指定要创建的ChannelFlow的capacity,在本例子中为buffer操作符时传入的,如果不传默认为OPTIONAL_CHANNEL//比如flowOn操作符创建ChannelFlowOperatorImpl时就没有传。capacity: Int = Channel.OPTIONAL_CHANNEL,//缓存满了后的溢出策略,如果不传,默认为suspendonBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {//在父类ChannelFlow中的fuse函数中被调用,new 一个该实例返回。override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)//脱离ChannelFlow,比如val f = flow.buffer(5,suspend),f是一个ChannelFlowOperatorImpl,//如果val f2 = f.dropChannelOperators,那么f2 就是flow。override fun dropChannelOperators(): Flow<T> = flow//该函数实在父类ChannelFlowOperator种定义。这里做的事情就是ChannelFlow去collect它上游的数据。override suspend fun flowCollect(collector: FlowCollector<T>) =//这里的flow 就是我们案例中的coldFlowflow.collect(collector)
}
ChannelFlowOperator源码
// ChannelFlow implementation that operates on another flow before it
internal abstract class ChannelFlowOperator<S, T>(@JvmField protected val flow: Flow<S>,context: CoroutineContext,capacity: Int,onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {//子类ChannelFlowOperatorImpl中实现protected abstract suspend fun flowCollect(collector: FlowCollector<T>)/*** 这个函数,是在该类的collect函数中当capacity = Channel.OPTIONAL_CHANNEL,并且不需要切线程是被调用。* 该函数会用一个新协成(运行在newContext)去collect上游,拿到上游的数据后切换会原来的协成,把从上游* 拿到的数据交给下游的collector。上游和下游工作在不同协成,但是两个协成时同一个线程** 从这个函数命名也可以看出来,让下游collect上游协成运行在指定的context环境里面。* 但是Undispatched,意思是不使用协成调度器,即不会发生线程切换。* 也就是说最下游collect ChannelFlow 和 ChannelFlow collect最上游他们都是在同一个线程,但是是不同的协成。* * 比如说:上游和下游都运行在IO线程了面,flowOn没有切线程。 ** GlobalScope.launch (Dispatchers.IO){** val flow = flow<Int> {* println("${currentCoroutineContext()[CoroutineName]} emit 1")* emit(1)* }.flowOn( CoroutineName("上游协成")) //给上游所有运行的协成取一个“上游协成”的名字** launch (CoroutineName("下游协成")){* flow.collect{* println("${currentCoroutineContext()[CoroutineName]} collect $it")* }* }* }** 输出结果:* 18:12:45:885[ DefaultDispatcher-worker-3 ] CoroutineName(上游协成) emit 1* 18:12:45:911[ DefaultDispatcher-worker-3 ] CoroutineName(下游协成) collect 1** 虽然调用了flowOn,但是没有切换线程,所以执行效果上和不加flowOn是一样的的。*/private suspend fun collectWithContextUndispatched(collector: FlowCollector<T>, newContext: CoroutineContext) {/*** withUndispatchedContextCollector 会根据原来的collector的类型不同* 有可能还是返回原来的colector,也有可能创建一个新的FlowCollector返回,新的collector* 是UndispatchedContextCollector类型,它里面会包含* 原来的collector(最下游的),新的collector在newContext里面去collect上游的数据,取到数据后* 切换到原来的context,把数据交给原来的collector,这个过程中不会涉及到线程切换,也不会用到缓存,即* ChannelFlow内部不会创建Channel。* * 参数coroutineContext 代表了原来协成的context。*/val originalContextCollector = collector.withUndispatchedContextCollector(coroutineContext)// withContextUndispatched函数里面会开启一个协成,协成运行在newContext里面,协成会//执行block这个函数。block 函数接收一个FowCollector类型的参数,协成执行block的时候就会把//originalContextCollector传入进去,在block中调用了flowCollect函数,flowCollect函数在子类//ChannelFlowOperatorImpl中实现,用originalContextCollector去collect上游的数据。//所以这样就实现了让上游运行在newContext的协成里面,originalContextCollector取到上游的数据后//又会开启一个协成(运行在原来的context)把数据交给下游。return withContextUndispatched(newContext, block = { flowCollect(it) }, value = originalContextCollector)}// 重写父类ChannelFlow的collectTo。只有当ChannelFlow内部需要创建Channel时,用Channel//作为下游的输出,让Channel来作为上游的输入时,会调用该函数,像比如在flowOn不切线程时// 就不会用到Channel。所以不会用到这个函数,而是走collectWithContextUndispatched。///buffer操作符会用到这个函数,而不会走collectWithContextUndispatched//这里的scope 为一个Channel类型,这个scope就是在父类ChannelFlow中的//produceImpl中返回的。protected override suspend fun collectTo(scope: ProducerScope<T>) =//flowCollect在ChannelFlowOperatorImpl中实现。//创建了一个SendingCollector对象,把Channel传到里面。//SendingCollector用来collect上游的collector。//SendingCollector接收到上游的数据后,就把数据send到Channel(scope)中flowCollect(SendingCollector(scope))// 重写ChannelFlow的collect函数,当override suspend fun collect(collector: FlowCollector<T>) {// Fast-path: When channel creation is optional (flowOn/flowWith operators without buffer)//capacity == Channel.OPTIONAL_CHANNEL代表了时通过flowOn/flowWith操作符创建的ChannelFlow。if (capacity == Channel.OPTIONAL_CHANNEL) {//拿到下游collect ChannelFlow时所在协成的context。val collectContext = coroutineContext//计算出ChannelFlow collect 上游时协成需要运行的context。//context 为ChannelFlow的属性,是创建ChannelFlow时指定的,在这里也就是//调用flowOn函数传的context.val newContext = collectContext + context //如果计算出newContext 和 collectContext相等,说明不需再开一个协成,那就在现在所属的协成里面//直接collect上游,因此直接调用flowCollect函数。if (newContext == collectContext)return flowCollect(collector)//如果两个context不相等,但是协成调度器是同一个,那就调用collectWithContextUndispatched//这样让上游和下游运行在两个不同协成(但是是同一个线程里面),并且上游和下游之间不会使用到//缓存(不会创建Channel),因此上游必须在下游消费完成一个数据后才能继续发送下一个.if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])return collectWithContextUndispatched(collector, newContext)}// 如果前面都不是,那就表明需要用缓存(创建Channel)。比如调用buffer,或者flowOn时切换了线程。super.collect(collector)}// debug toStringoverride fun toString(): String = "$flow -> ${super.toString()}"
}// --------------------------------------------------
// 限于篇幅有限,以下几个函数设计到的知识是和协成知识有关的,就不在这里详细解释了,只是简单说一下
//----------------------------------------------------internal suspend fun <T, V> withContextUndispatched(newContext: CoroutineContext,value: V,countOrElement: Any = threadContextElements(newContext), // can be precomputed for speedblock: suspend (V) -> T
): T =suspendCoroutineUninterceptedOrReturn { uCont ->//在newContext里面取开启一个协成,执行block的代码。//这里的block就是{flowCollect(it)},要做的事情就是去collect上游。withCoroutineContext(newContext, countOrElement) {block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))}}private fun <T> FlowCollector<T>.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector<T> = when (this) {// SendingCollector & NopCollector do not care about the context at all and can be used as isis SendingCollector, is NopCollector -> this// Otherwise just wrap into UndispatchedContextCollector interface implementationelse -> UndispatchedContextCollector(this, emitContext)
}private class UndispatchedContextCollector<T>(downstream: FlowCollector<T>,private val emitContext: CoroutineContext
) : FlowCollector<T> {private val countOrElement = threadContextElements(emitContext) // precompute for fast withContextUndispatchedprivate val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creationoverride suspend fun emit(value: T): Unit =// 从上游拿到数据后,切会原来的协成,原来的协成会执行emitRef,在其里面会把上游的拿到的数据交给//下游的collector.withContextUndispatched(emitContext, value, countOrElement, emitRef)
}
我们继续来看一个例子既有buffer,又有flowOn,以这个案例跟踪一下代码执行的流程:
fun main() = runBlocking{val coldFlow = flow<Int> {repeat(10){emit(it)delay(100)}}coldFlow.buffer(5,BufferOverflow.SUSPEND).flowOn(Dispatchers.IO).collect{log("collect $it")delay(200)}}
第一步:
coldFlow.buffer(5,BufferOverflow.SUSPEND) 会返回一个ChannelFlowOperatorImpl。
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"}require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {"CONFLATED capacity cannot be used with non-default onBufferOverflow"}// desugar CONFLATED capacity to (0, DROP_OLDEST)var capacity = capacityvar onBufferOverflow = onBufferOverflowif (capacity == CONFLATED) {capacity = 0onBufferOverflow = BufferOverflow.DROP_OLDEST}// create a flowreturn when (this) {is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)//coldFlow不是一个FusibleFlow,因此走这里,创建一个ChannelFlowOperatorImpl//注意这里的第一个参数,把coldFlow传入进去了,作为ChannelFlowOperatorImpl的上游。else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)}
}
ChannelFlowOperatorImpl的构造函数有4个参数,在buffer中只传了3个,context没有传,采用默认成参数EmptyCoroutineContext。
internal class ChannelFlowOperatorImpl<T>(flow: Flow<T>,context: CoroutineContext = EmptyCoroutineContext,capacity: Int = Channel.OPTIONAL_CHANNEL,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow)
ChannelFlowOperatorImpl继承至ChannelFlowOperator,所以在构造ChannelFlowOperatorImpl的时候会调用父类ChannelFlowOperator的构造函数,把coldFlow,EmptyCoroutineContext,5,BufferOverflow.SUSPEND四个值传入ChannelFlowOperator的构造函数里面。
internal abstract class ChannelFlowOperator<S, T>(@JvmField protected val flow: Flow<S>,context: CoroutineContext,capacity: Int,onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow)
在ChannelFlowOperator中定义了一个public的flow的字段,因此我们的上游coldFlow就存入了flow这个字段中。
ChannelFlowOperator继承至ChannelFlow,因此继续调用ChannelFlow的构造函数,把EmptyCoroutineContext,5,BufferOverflow.SUSPEND这三个值继续传入了ChannelFlow中。
public abstract class ChannelFlow<T>(// upstream context@JvmField public val context: CoroutineContext,// buffer capacity between upstream and downstream context@JvmField public val capacity: Int,// buffer overflow strategy@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T>
在ChanneFlow中定义了三个public的字段,context,capacity,onBufferOverflow。于是:
context= EmptyCoroutineContext
capacity = 5
onBufferOverflow = BufferOverflow.SUSPEND
到此buffer操作符返回这个ChannelFlowOperatorImpl的字段:
flow = coldFlow
context = EmptyCoroutineContext
capacity = 5
onBufferOverflow = BufferOverflow.SUSPEND
注意这时候ChannelFlow内部的Channel还还没有创建的。
第二步:
ChannelFlowOperatorImpl.flowOn(Dispatchers.IO) 会调用 ChannelFlow的fuse函数,返回一个新的ChannelFlowOperatorImpl。
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {checkFlowContext(context)return when {context == EmptyCoroutineContext -> this//走这里调用第一步buffer返回的ChannelFlowOperatorImpl的fusethis is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context)}
}
在ChannelFlow的fuse函数中:
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow)
由于只传了一个context = Dispatchers.IO一个参数,capacity 和 onBufferOverflow两个参数为默认参数,默认参数是定义在FusibleFlow中:
public interface FusibleFlow<T> : Flow<T> {/*** This function is called by [flowOn] (with context) and [buffer] (with capacity) operators* that are applied to this flow. Should not be used with [capacity] of [Channel.CONFLATED]* (it shall be desugared to `capacity = 0, onBufferOverflow = DROP_OLDEST`).*/public fun fuse(context: CoroutineContext = EmptyCoroutineContext,capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>
}
因此就相当于fuse(Dispatchers.IO,Channel.OPTIONAL_CHANNEL,BufferOverflow = BufferOverflow.SUSPEND)。
进入fuse内部:
//此时,this.context = EmptyCoroutineContext
//this.capacity = 5
//this.onBufferOverflow = BufferOverflow.SUSPEND
//形参:context = Dispatchers.IO,capacity = Channel.OPTIONAL_CHANNEL,
//onBufferOverflow = BufferOverflow.SUSPEND
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {assert { capacity != Channel.CONFLATED } //step1://context = Dispatchers.IO,this.context = EmptyCoroutineContext//newContext = Dispatchers.IO + EmptyCoroutineContext = Dispatchers.IOval newContext = context + this.contextval newCapacity: Intval newOverflow: BufferOverflow//onBufferOverflow = BufferOverflow.SUSPEND,因此不会走这里if (onBufferOverflow != BufferOverflow.SUSPEND) {newCapacity = capacitynewOverflow = onBufferOverflow} else {newCapacity = when {this.capacity == Channel.OPTIONAL_CHANNEL -> capacity//step2:走这里:因此newCpacity = this.capacity = 5capacity == Channel.OPTIONAL_CHANNEL -> this.capacitythis.capacity == Channel.BUFFERED -> capacitycapacity == Channel.BUFFERED -> this.capacityelse -> {// sanity checksassert { this.capacity >= 0 }assert { capacity >= 0 }val sum = this.capacity + capacityif (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow}}//step3: newOverflow = BufferOverflow.SUSPENDnewOverflow = this.onBufferOverflow}if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)return this//step4:创建一个新的return create(newContext, newCapacity, newOverflow)}
create函数在ChannelFlowOperatorImpl中实现:
internal class ChannelFlowOperatorImpl<T>(flow: Flow<T>,context: CoroutineContext = EmptyCoroutineContext,capacity: Int = Channel.OPTIONAL_CHANNEL,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {//step5:override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =//返回一个新的ChannelFlowOperatorImpl,flow = coldFlow,//context= Dispatchers.IO,// capacity = 5// onBufferOverflow = BufferOverflow.SUSPENDChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)override fun dropChannelOperators(): Flow<T> = flowoverride suspend fun flowCollect(collector: FlowCollector<T>) =flow.collect(collector)
}
flowOn操作符又返回了一个新的ChannelFlowOperatorImpl。这个ChannelFlowOperatorImpl的字段:
flow = coldFlow
capacity = 5
context= Dispatchers.IO
onBufferOverflow = BufferOverflow.SUSPEND
对比一下再第一步返回的ChannelFlowOperatorImpl:
bufferf返回的ChannelFlowOperatorImpl | flowOn返回的ChannelFlowOperatorImpl |
---|---|
flow = coldFlow | flow = coldFlow |
capacity = 5 | capacity = 5 |
context= EmptyCoroutineContext | context= Dispatchers.IO |
onBufferOverflow = BufferOverflow.SUSPEND | onBufferOverflow = BufferOverflow.SUSPEND |
有没有发现flowOn返回的ChannelFlowOperatorImpl就是buffer返回的ChannelFlowOperatorImpl融合(fuse)了Dispatchers.IO后形成的一个新的ChannelFlowOperatorImpl。
第三步:
对flowOn返回的ChannelFlowOperatorImpl进行collect,会调用到ChannelFlowOperator的collect函数:
/*** ChannelFlowOperator.collect*/
override suspend fun collect(collector: FlowCollector<T>) {//this.capacity = 5,不走这里if (capacity == Channel.OPTIONAL_CHANNEL) {val collectContext = coroutineContextval newContext = collectContext + context // compute resulting collect context// #1: If the resulting context happens to be the same as it was -- fallback to plain collectif (newContext == collectContext)return flowCollect(collector)// #2: If we don't need to change the dispatcher we can go without channelsif (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])return collectWithContextUndispatched(collector, newContext)}//调用父类ChannelFlow的collect函数super.collect(collector)
}
ChannelFlow的collect:
override suspend fun collect(collector: FlowCollector<T>): Unit =//因为在produceImpl 函数中调用produce函数,所以需要拿到一个CoroutineScope对象//如果只是单纯的要一个CoroutineScope,那么直接用GlobalScope也行。//那为什么不用GlobalScope呢?因为在produce函数要开启一个协成,这个协成就是要去collect上游的(coldFlow).//如果是用GlobalScope那就相当于在开启的协成是一个独立的协成。这就回存在一个问题,如果下游collect所在的协成//被取消了,在produce里面开启的协成也不会取消,也就是说coldFlow还可以 继续emit。因此在produce里面//所开启的的协成必须是下游collect所在协成的子协成。所以用到了coroutineScope。//coroutineScope能拿到外部的Scope对象。coroutineScope {//emitAll就比较简单了,就是开启一个while死循环,去接收produce里面创建的Channel里面的数据。//把从Channel里面接收到的数据,在一个一个的通过collector的emit发送给下游。collector.emitAll(produceImpl(this))}// produce 里面创建一个Channel对象,Channel的缓存为produceCapacity = 5,
// 溢出策略为onBufferOverflow = suspend。
//produce里面还会开启一个子协成,参数context 就是Dispatchers.IO,因此这个子协成会运行在IO线程里面。
//子协成执行collecToFun去collect上游coldFlow。把从上游接收到的数据
//send到Channel中。
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =//produce函数“Kotlin Channel系列(一)之读懂Channel每一行源码”这边文章里面有讲,感兴趣的可以去看看scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)// ----------------------------------------------------------------------------------------
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =emitAllImpl(channel, consume = true)private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {ensureActive()var cause: Throwable? = nulltry {while (true) {//从Channel中receive数据val result = run { channel.receiveCatching() }//如果Channel关闭了就break。抛出异常,//当coldFlow最后一个数据发送完成后,会调用channel的close,关闭Channel.//关闭的操作是发生在ProducerCoroutine的onComplete函数,//ProducerCoroutine不光是一个Channel类型,同时也是一个Continuation类型,//它作为produce函数中开启写的的completionif (result.isClosed) {//如果是coldFlow中发送完成收据后把Channel 正常close掉的,exceptionOrNull就会返回null//那就直接break带哦,走finally//如果是coldFlow中发生了异常,子协成发生异常,父协成也会收到异常,因此走catch,再走finally.result.exceptionOrNull()?.let { throw it }break // returns normally when result.closeCause == null}//政策receive到数据,交给下游的collectoremit(result.getOrThrow())//如果下游collector收到数据后,处理发数据发生了异常,先走catch,再走finally.}} catch (e: Throwable) {cause = ethrow e} finally {//emitAll调用emitAllImpl时参数consume = true.因此会把channel cancel调。//为什么要把Channel cancel掉呢?Chananl cancel后,除了不能再往里面send ,也不能receive以外//还会把Channel的缓存中未被消费的数据都移除。//所以如果是因为上游emit 还是下游消费数据时,发生了异常,导致finally执行,cancel Channe是有必要的。if (consume) channel.cancelConsumed(cause)}
}
在produce函数中开启的子协成会执行collectToFun:
//ChannelFlow.collectToFun.
//参数ProducerScope是一个Channel类型,是就是在produce函数中创建的Channel的一个代理。
//暂且就把它看着是produce中创建的Channel。
internal val collectToFun: suspend (ProducerScope<T>) -> Unit//这里面有调用了collectTo函数。//it 就是参数ProducerScopeget() = { collectTo(it) }
collectTo函数在ChannelFlowOperator里面被实现:
//scope 为一个Channel。
protected override suspend fun collectTo(scope: ProducerScope<T>) =//flowCollect在ChannelFlowOperatorImpl中实现。//创建了一个SendingCollector对象,把Channel传到里面。//SendingCollector用来collect上游coldFlow的collector。flowCollect(SendingCollector(scope))public class SendingCollector<T>(private val channel: SendChannel<T>
) : FlowCollector<T> {//上游coldFlow的数据交给SendingCollector后,直接把数据send到Channel里面。//还记得前面ChannelFlow中collect函数里面的代码:// collector.emitAll(produceImpl(this)) 就是从Channel里面receive数据,然后交给下游的collector。// 到这里,是不是就通了,上游把数据send到Channel里面,下游从Channel里面receive数据。override suspend fun emit(value: T): Unit = channel.send(value)
}
flowCollect函数在ChannelFlowOperator里面被实现
override suspend fun flowCollect(collector: FlowCollector<T>) =//flow 就是上游coldFlow,此处的collector就是SendingCollector,上游把数据交给SendingCollector//SendingCollector再把数据send到Channel里面。flow.collect(collector)
ChannelFlow的余下几个子类:
ChannelFlowBuilder
ChannelAsFlow
CallbackFlowBuilder
ChannelFlowMerge
ChannelFlowTransformLatest
ChannelLimitedFlowMerge
相关文章近期期待…关注微信公众号,第一时间获得通知