Kotlin协程笔记:CoroutineScope管理协程

news/2024/11/20 14:20:37/

         CoroutineScope 是实现协程结构化并发的关键。使用 CoroutineScope,可以批量管理同一个作用域下面所有的协程。

 

        CoroutineScope 与 结构化并发

        launch、async 被定义成了 CoroutineScope 扩展函数。在调用 launch 之前,必须先获取 CoroutineScope。

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit
): Job {val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine
}

public fun <T> CoroutineScope.async(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> T
): Deferred<T> {val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyDeferredCoroutine(newContext, block) elseDeferredCoroutine<T>(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine
}

        为何要设计成扩展方法?

        

fun main() {testCoroutinueScope()
}private fun showScopeLog(any: Any?) {println("""$any Thread:${Thread.currentThread().name}""".trimIndent())
}private fun testCoroutinueScope() {val scope = CoroutineScope(Job())scope.launch {launch {delay(1000000L)showScopeLog("Inner")}showScopeLog("Hello")delay(1000000L)showScopeLog("World") //不执行}scope.launch {launch {delay(1000000L)showScopeLog("Inner!!!")}showScopeLog("Hello!!!")delay(1000000L)showScopeLog("World!!!") //不执行}Thread.sleep(500L)scope.cancel()
}Log:Hello Thread:DefaultDispatcher-worker-1 @coroutine#1
Hello!!! Thread:DefaultDispatcher-worker-3 @coroutine#2Process finished with exit code 0

 scope 创建了两个顶层的协程,接着,在协程的内部我们使用 launch 又创建了一个子协程。最后,在协程的外部等待了 500 毫秒,并且调用了 scope.cancel()。结果前面创建的 4 个协程就全部都取消了。

         父协程是属于 Scope 的,子协程是属于父协程的,只要调用了 scope.cancel(),这 4 个协程都会被取消。

        

        CoroutineScope 管理协程的能力,源自于 Job。

        父子协程关系如何建立?-CoroutineScope 如何通过 Job 来管理协程。

        CoroutineScope 是一个接口,为什么可以调用其构造函数,来创建 CoroutineScope 对象?不应该使用 object 关键字创建匿名内部类吗?

        调用 CoroutineScope() 并不是构造函数,而是一个顶层函数。

private fun testCoroutinueScope() {val scope = CoroutineScope(Job())scope.launch {launch {delay(1000000L)showScopeLog("Inner")}showScopeLog("Hello")delay(1000000L)showScopeLog("World") //不执行}scope.launch {launch {delay(1000000L)showScopeLog("Inner!!!")}showScopeLog("Hello!!!")delay(1000000L)showScopeLog("World!!!") //不执行}Thread.sleep(500L)scope.cancel()
}

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {override val coroutineContext: CoroutineContext = context// CoroutineScope is used intentionally for user-friendly representationoverride fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

         Kotlin 当中的函数名称,在大部分情况下都是遵循“驼峰命名法”的,而在一些特殊情况下则不遵循这种命名法。上面的顶层函数 CoroutineScope(),其实就属于特殊的情况,因为它虽然是一个普通的顶层函数,但它发挥的作用却是“构造函数”。类似的用法,还有 Job() 这个顶层函数。

        在 Kotlin 当中,当顶层函数作为构造函数使用的时候,首字母是要大写的。

        创建 CoroutineScope 的时候,如果传入的 Context 是包含 Job 的,那就直接用;如果是不包含 Job 的,就会创建一个新的 Job。这就意味着,每一个 CoroutineScope 对象,它的 Context 当中必定存在一个 Job 对象。代码中的 CoroutineScope(Job()),改成 CoroutineScope() 也是完全没问题的。

        

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit
): Job {val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine
}

private open class StandaloneCoroutine(parentContext: CoroutineContext,active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {override fun handleJobException(exception: Throwable): Boolean {handleCoroutineException(context, exception)return true}
}private class LazyStandaloneCoroutine(parentContext: CoroutineContext,block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {private val continuation = block.createCoroutineUnintercepted(this, this)override fun onStart() {continuation.startCoroutineCancellable(this)}
}

StandaloneCoroutine 是 AbstractCoroutine 的子类,AbstractCoroutine 代表了协程的抽象类。另外这里有一个 initParentJob 参数,它是 true,代表了协程创建了以后,需要初始化协程的父子关系。而 LazyStandaloneCoroutine 则是 StandaloneCoroutine 的子类,它的 active 参数是 false,代表了以懒加载的方式创建协程。

public abstract class AbstractCoroutine<in T>(parentContext: CoroutineContext,initParentJob: Boolean,active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {... ...init {/** Setup parent-child relationship between the parent in the context and the current coroutine.* It may cause this coroutine to become _cancelling_ if the parent is already cancelled.* It is dangerous to install parent-child relationship here if the coroutine class* operates its state from within onCancelled or onCancelling* (with exceptions for rx integrations that can't have any parent)*/if (initParentJob) initParentJob(parentContext[Job])}
}

AbstractCoroutine 是 JobSupport 的子类,在 init{} 代码块当中,根据 initParentJob 参数,判断是否需要初始化协程的父子关系。initParentJob 是 true,所以这里的 initParentJob() 方法一定会执行,而它的参数 parentContext[Job]取出来的 Job,其实就是在 Scope 当中的 Job。

 initParentJob() 方法,是它的父类 JobSupport 当中的方法。

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {final override val key: CoroutineContext.Key<*> get() = Jobprotected fun initParentJob(parent: Job?) {assert { parentHandle == null }if (parent == null) {parentHandle = NonDisposableHandlereturn}parent.start()@Suppress("DEPRECATION")val handle = parent.attachChild(this)parentHandle = handleif (isCompleted) {handle.dispose()parentHandle = NonDisposableHandle }}
}public interface Job : CoroutineContext.Element {public val children: Sequence<Job>   public fun attachChild(child: ChildJob): ChildHandle
}

上面的代码一共有三个地方需要注意,我们来分析一下:

首先判断传入的 parent 是否为空,如果 parent 为空,说明当前的协程不存在父 Job,就不需要创建协程父子关系。

然后确保 parent 对应的 Job 启动了。

parent.attachChild(this)它会将当前的 Job,添加为 parent 的子 Job。这里其实就是建立协程父子关系的关键代码。

 协程是如何“结构化取消”的?

 协程的结构化取消,本质上是事件的传递。

public fun CoroutineScope.cancel(cause: CancellationException? = null) {val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")job.cancel(cause)
}

         CoroutineScope 的 cancel() 方法,本质上是调用了它当中的 Job.cancel()。而这个方法的具体实现在 JobSupport 当中

public override fun cancel(cause: CancellationException?) {cancelInternal(cause ?: defaultCancellationException())
}public open fun cancelInternal(cause: Throwable) {cancelImpl(cause)
}internal fun cancelImpl(cause: Any?): Boolean {var finalState: Any? = COMPLETING_ALREADYif (onCancelComplete) {finalState = cancelMakeCompleting(cause)if (finalState === COMPLETING_WAITING_CHILDREN) return true}if (finalState === COMPLETING_ALREADY) {finalState = makeCancelling(cause)}return when {finalState === COMPLETING_ALREADY -> truefinalState === COMPLETING_WAITING_CHILDREN -> truefinalState === TOO_LATE_TO_CANCEL -> falseelse -> {afterCompletion(finalState)true}}
}
if (onCancelComplete) {        finalState = cancelMakeCompleting(cause)        if (finalState === COMPLETING_WAITING_CHILDREN) return true   
}

job.cancel() 最终会调用 JobSupport 的 cancelImpl() 方法。上面的代码中onCancelComplete 是 Boolean 类型的成员属性。代表了当前的 Job,是否有协程体需要执行。另外,由于 CoroutineScope 当中的 Job 是手动创建的,并不需要执行任何协程代码,所以,它会是 true。继续分析 cancelMakeCompleting() 方法:

private fun cancelMakeCompleting(cause: Any?): Any? {loopOnState { state ->val finalState = tryMakeCompleting(state, proposedUpdate)if (finalState !== COMPLETING_RETRY) return finalState}
}private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {if (state !is Incomplete)return COMPLETING_ALREADYreturn COMPLETING_RETRY}return tryMakeCompletingSlowPath(state, proposedUpdate)
}private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {notifyRootCause?.let { notifyCancelling(list, it) }return finalizeFinishingState(finishing, proposedUpdate)
}

 cancelMakeCompleting() 会调用 tryMakeCompleting() 方法,最终则会调用 tryMakeCompletingSlowPath() 当中的 notifyCancelling() 方法。所以,它才是最关键的代码。

private fun notifyCancelling(list: NodeList, cause: Throwable) {onCancelling(cause)notifyHandlers<JobCancellingNode>(list, cause)cancelParent(cause)
}
  •  通知子Job
notifyHandlers<JobCancellingNode>(list, cause)
  • 通知父Job
 cancelParent(cause)

 

通知子Job流程:

private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {var exception: Throwable? = nulllist.forEach<T> { node ->try {node.invoke(cause)} catch (ex: Throwable) {exception?.apply { addSuppressedThrowable(ex) } ?: run {exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)}}}exception?.let { handleOnCompletionException(it) }
}

 遍历当前 Job 的子 Job,并将取消的 cause 传递过去,这里的 invoke() 最终会调用 ChildHandleNode 的 invoke() 方法:


internal class ChildHandleNode(@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {override val parent: Job get() = joboverride fun invoke(cause: Throwable?) = childJob.parentCancelled(job)override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}public final override fun parentCancelled(parentJob: ParentJob) {cancelImpl(parentJob)
}

 ChildHandleNode 的 invoke() 方法会调用 parentCancelled() 方法,而它最终会调用 cancelImpl() 方法。 Job 取消的入口函数。这实际上就相当于在做递归调用。

通知父 Job 的流程:


private fun cancelParent(cause: Throwable): Boolean {if (isScopedCoroutine) return trueval isCancellation = cause is CancellationExceptionval parent = parentHandleif (parent === null || parent === NonDisposableHandle) {return isCancellation}return parent.childCancelled(cause) || isCancellation
}

 这个函数的返回值返回 true 代表父协程处理了异常,而返回 false,代表父协程没有处理异常。这种类似责任链的设计模式。


public open fun childCancelled(cause: Throwable): Boolean {if (cause is CancellationException) return truereturn cancelImpl(cause) && handlesException
}

 当异常是 CancellationException 的时候,协程是会进行特殊处理的。一般来说,父协程会忽略子协程的取消异常。而如果是其他的异常,那么父协程就会响应子协程的取消了。代码又会继续递归调用cancelImpl() 方法了。


http://www.ppmy.cn/news/5489.html

相关文章

设计模式之迭代器模式

Iterator design pattern 迭代器模式的概念、迭代器模式的结构、迭代器模式的优缺点、迭代器模式的使用场景、迭代器模式的实现示例、迭代器模式的源码分析 1、迭代器模式的概念 迭代器模式&#xff0c;即提供一种方法来顺序访问聚合对象内的元素&#xff0c;而不暴露聚合对象…

gRPC学习Go版(一)

文章目录微服务入门gRPC是什么proto 服务定义gRPC 优势gRPC入门简单使用一元RPC服务流RPC客户流RPC双工流RPCgRPC底层原理RPC流长度前缀的消息分帧请求消息响应信息通信模式下的消息流微服务入门 现在的软件很少是一个孤立的单体应用运行的&#xff0c;相反更多是通过互联网连接…

Pytorch/Paddle topk 与 Numpy argpartition 函数应用

前言 他们两者都在些搜索、匹配、找相关性的时候会用到。 topk 参数 torch.topk(input, k, dimNone, largestTrue, sortedTrue, *, outNone) paddle.topk(x, k, axisNone, largestTrue, sortedTrue, nameNone) input / x : 输入的多维Tensor,支持的数据类型 float32、float64、…

C语言重点解剖第17课笔记

1.预处理阶段&#xff0c;先去注释&#xff0c;再宏替换。所以宏替换不能用于去注释。 #define bsc //就变成了一个空的宏。(//在这里面本来就是注释&#xff0c;只是注释后面的内容为空) 2.宏定义并不是简单的无脑替换。 printf(" ")中&#xff0c;双引号里面的东…

Oracle --- 基础

目录 启动Oracle Oracle监听 监听服务的主要文件 listener.ora tnsnames.ora sqlnet.ora Oracle用户 创建用户 启动Oracle 四步 su - oracle # 切换用户&#xff0c;进入oracle的用户&#xff0c;读取oracle的配置文件lsnrctl start # 启…

用3Dmax优化模型的方法,让你的效果图又快又好

3DMax是一个特别强大的建模软件&#xff0c;它具有无数需要小伙伴付出时间和练习才能掌握的特性和功能。 如果你已经能足够直观的掌握基础知识&#xff0c;并且已经能创造出很优秀的作品。这必然是一件值得高兴的事。 但是&#xff0c;在设计过程的妙处就在于&#xff0c;总是…

Allegro如何翻转PCB视图操作指导

Allegro如何翻转PCB视图操作指导 Allegro可以翻转PCB的视图,利于查看和检查,如下图 翻转前:器件和走线在bottom层 翻转后:走线和器件仍然在bottom层,但是视图翻转了 具体操作如下

搜遍全网,终于找到了报表自动化的最佳工具,比Excel好用10倍

工作快十年了&#xff0c;最开始是在华为做报表&#xff0c;后来去了美团从事大数据和平台方面的工作&#xff0c;现在在国企干了快三年。辗转各大企业的IT部门&#xff0c;最明显的感知就是企业间数字化程度差距太大了。就从报表这个点来说&#xff0c;互联网公司早就实现了报…