生活中的Design.

Kotlin协程相关

字数统计: 1.7k阅读时长: 7 min
2021/12/21 Share

基础概念

关于kotlin中的协程,kotlin的标准库里只包含了协程的抽象,放在kotlin.coroutines包下
比如:

  • Continuation
  • CoroutineContext

具体的实现则需要另外引入依赖kotlinx.coroutines,里面会有关于协程的高级原语(High-level Primitives):

  • launch:启动一个协程,他将会与其他剩余代码同时(concurrently)执行,返回一个Job
  • async:类似launch,不过返回一个Defer

结构化并发

Structured concurrency
协程遵循结构化并发的原则,所有的协程必须在限定生命周期的特定 CoroutineScope 中启动。结构化并发确保它们不会丢失,不会泄漏。在其所有子协程完成之前,外部作用域无法完成。结构化并发还确保正确报告代码中的任何错误并且永远不会丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
try {
failedConcurrentSum()
} catch(e: ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE) // Emulates very long computation
42
} finally {
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}
1
2
3
Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException

默认顺序执行

2个suspend函数默认是以此执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun doSomethingUsefulOne(): Int {
delay(1000L)
return 13
}

suspend fun doSomethingUsefulTwo(): Int {
delay(1000L)
return 29
}

val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
1
2
The answer is 42
Completed in 2017 ms

使用async并发执行

1
2
3
4
5
6
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
1
2
The answer is 42
Completed in 1017 ms

上下文与调度器

Coroutine context and dispatchers

上下文CoroutineContext

协程的上下文定义在标准库中,CoroutineContext,它是一个介于Set和Map之间的类型,内部定义了Key和Element,所有的协程必须在指定的上下文中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface CoroutineContext {
public abstract fun <R> fold(initial: R, operation: (R, kotlin.coroutines.CoroutineContext.Element) -> R): R

public abstract operator fun <E : kotlin.coroutines.CoroutineContext.Element> get(key: kotlin.coroutines.CoroutineContext.Key<E>): E?

public abstract fun minusKey(key: kotlin.coroutines.CoroutineContext.Key<*>): kotlin.coroutines.CoroutineContext

public open operator fun plus(context: kotlin.coroutines.CoroutineContext): kotlin.coroutines.CoroutineContext { /* compiled code */ }

public interface Element : kotlin.coroutines.CoroutineContext {
public abstract val key: kotlin.coroutines.CoroutineContext.Key<*>

public open fun <R> fold(initial: R, operation: (R, kotlin.coroutines.CoroutineContext.Element) -> R): R { /* compiled code */ }

public open operator fun <E : kotlin.coroutines.CoroutineContext.Element> get(key: kotlin.coroutines.CoroutineContext.Key<E>): E? { /* compiled code */ }

public open fun minusKey(key: kotlin.coroutines.CoroutineContext.Key<*>): kotlin.coroutines.CoroutineContext { /* compiled code */ }
}

public interface Key<E : kotlin.coroutines.CoroutineContext.Element> {
}
}

看一个典型的Element

1
2
3
4
5
6
7
8
9
10
11
12
public interface CoroutineExceptionHandler : CoroutineContext.Element {
/**
* Key for [CoroutineExceptionHandler] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

/**
* Handles uncaught [exception] in the given [context]. It is invoked
* if coroutine has an uncaught exception.
*/
public fun handleException(context: CoroutineContext, exception: Throwable)
}

我们发现CoroutineExceptionHandler实现了Element,并且它的伴生对象(companion object)实现了Key,所以可以写出下面这样的语法👇:

1
coroutineContext[Job]
1
2
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.IO + CoroutineExceptionHandler

调度器Dispatcher

1
2
3
4
5
6
7
8
9
10
11
12
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
1
2
3
4
Unconfined            : I'm working in thread main
Default : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking : I'm working in thread main

无参数的launch会继承CoroutineScope内的上下文,因此也会使用父Scope的调度器(因为调度器也是Element,包含在CoroutineContext内),父协程被取消时子协程也会被取消,除了一下几种情况:

  • 启动协程时额外指定不同的CoroutineScope,比如GlobalScope.launch
  • 启动时传入不同的Job对象:launch(Job()) {}

亲类责任

Parental responsibilities

父协程总是等待子协程的完成,父级不必显式跟踪它启动的所有子级,也不必使用 Job.join 在协程代码块最后等待它们

协程作用域

CoroutineScope

现在我们把上下文,父子协程,Job相关的知识联系起来,CoroutineScope是包含CoroutineContext的抽象,CoroutineContext又是一个Job、Dispatcher等的集合,一个Scope启动一个协程时,会把子协程的Context添加到父协程,生成新的协程上下文,如果父子协程使用的是同一个Job,父协程的取消也会取消子协程


异步流 Flow

TODO


Channel

TODO


协程异常处理

协程构造器的异常传播有两种类型:

  • 自动传播:launch和actor,如果用户未设置
    CoroutineExceptionHandler(在当前上下文或者任意父上下文),会在控制台打印
  • 暴露给用户: async和produce,需要用户显式地捕捉异常

异常聚合原则: 多个子协程发生异常,先发生的异常将会得到处理,后发生的异常会被放在首个异常的Throwable.suppressed

SupervisorJob

正如我们之前研究过的,取消是一种双向关系,在整个协程层次结构中传播。我们来看看需要单向取消的情况。

这种需求的一个很好的例子是在其范围内定义了作业的 UI 组件。如果 UI 的任何子任务失败,并不总是需要取消(有效杀死)整个 UI 组件,但如果 UI 组件被销毁(并且其作业被取消),则必须取消所有子作业因为不再需要他们的结果。

另一个例子是一个服务器进程,它产生多个子作业,需要监督它们的执行,跟踪它们的失败,并且只重新启动失败的那些。

SupervisorJob与Job类似,但是SupervisorJob上下文内协程的取消只会向下传播。

Job和SupervisorJob之间的另一个重要区别是异常处理。每个子协程都应该通过异常处理机制自己处理它的异常。这种差异来自于子协程的失败不会传播给父协程。这意味着直接在 supervisorScope 内启动的协程确实使用上下文内的 CoroutineExceptionHandler,其方式与根协程相同

1
2
3
4
5
6
7
8
9
10
11
val handler = CoroutineExceptionHandler { _, exception -> 
println("CoroutineExceptionHandler got $exception")
}
supervisorScope {
val child = launch(handler) {
println("The child throws an exception")
throw AssertionError()
}
println("The scope is completing")
}
println("The scope is completed")
1
2
3
4
The scope is completing
The child throws an exception
CoroutineExceptionHandler got java.lang.AssertionError
The scope is completed

Supervision scope

我们可以使用 supervisorScope 来代替 coroutineScope 进行作用域并发。它仅在一个方向上传播取消,当且仅当它自己失败时才取消其所有子项。它还像 coroutineScope 一样在完成之前等待所有子进程。

CATALOG
  1. 1. 基础概念
    1. 1.1. 结构化并发
    2. 1.2. 默认顺序执行
    3. 1.3. 使用async并发执行
    4. 1.4. 上下文与调度器
      1. 1.4.1. 上下文CoroutineContext
      2. 1.4.2. 调度器Dispatcher
    5. 1.5. 亲类责任
    6. 1.6. 协程作用域
    7. 1.7. 异步流 Flow
    8. 1.8. Channel
    9. 1.9. 协程异常处理
    10. 1.10. SupervisorJob
    11. 1.11. Supervision scope