协程(下) 5个月前

编程语言
738
协程(下)

挂起函数的组合执行

介绍挂起函数组合的各种方法。

按默认顺序执行

假设我们有两个在别处定义的挂起函数:

suspend fun doJob1(): Int {
    println("Doing Job1 ...")
    delay(1000L) // 此处模拟我们的工作代码
    println("Job1 Done")
    return 10
}

suspend fun doJob2(): Int {
    println("Doing Job2 ...")
    delay(1000L) // 此处模拟我们的工作代码
    println("Job2 Done")
    return 20
}

如果需要依次调用它们, 我们只需要使用正常的顺序调用, 因为协程中的代码 (就像在常规代码中一样) 是默认的顺序执行。下面的示例通过测量执行两个挂起函数所需的总时间来演示:

fun testSequential() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doJob1()
        val two = doJob2()
        println("[testSequential] 最终结果: ${one + two}")
    }
    println("[testSequential] Completed in $time ms")
}

执行上面的代码,我们将得到输出:

Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最终结果: 30
[testSequential] Completed in 6023 ms

可以看出,我们的代码是跟普通的代码一样顺序执行下去。

使用async异步并发执行

上面的例子中,如果在调用 doJob1 和 doJob2 之间没有时序上的依赖关系, 并且我们希望通过同时并发地执行这两个函数来更快地得到答案, 那该怎么办呢?这个时候,我们就可以使用async来实现异步。代码示例如下:

fun testAsync() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool) { doJob1() }
        val two = async(CommonPool) { doJob2() }
        println("最终结果: ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

如果跟上面同步的代码一起执行对比,我们可以看到如下输出:

Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最终结果: 30
[testSequential] Completed in 6023 ms
Doing Job1 ...
Doing Job2 ...
Job1 Done
Job2 Done
[testAsync] 最终结果: 30
[testAsync] Completed in 3032 ms

我们可以看出,使用async函数,我们的两个Job并发的执行了,并发花的时间要比顺序的执行的要快将近两倍。因为,我们有两个任务在并发的执行。

从概念上讲, async跟launch类似, 它启动一个协程, 它与其他协程并发地执行。

不同之处在于, launch返回一个任务Job对象, 不带任何结果值;而async返回一个延迟任务对象Deferred,一种轻量级的非阻塞性future, 它表示后面会提供结果。

在上面的示例代码中,我们使用Deferred调用 await() 函数来获得其最终结果。另外,延迟任务Deferred也是Job类型, 它继承自Job,所以它也有isActive、isCompleted属性,也有join()、cancel()函数,因此我们也可以在需要时取消它。

Deferred接口定义如下:

public interface Deferred<out T> : Job {
    val isCompletedExceptionally: Boolean
    val isCancelled: Boolean
    public suspend fun await(): T
    public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
    public fun getCompleted(): T
    @Deprecated(message = "Use `isActive`", replaceWith = ReplaceWith("isActive"))
    public val isComputing: Boolean get() = isActive
}

其中,常用的属性和函数说明如下:

名称 说明
isCompletedExceptionally 当协程在计算过程中有异常 failed 或被取消,返回true。 这也意味着isActive等于 false ,同时 isCompleted等于 true
isCancelled 如果当前延迟任务被取消,返回true
suspend fun await() 等待此延迟任务完成,而不阻塞线程;如果延迟任务完成, 则返回结果值或引发相应的异常。

延迟任务对象Deferred的状态与对应的属性值如下表所示:

状态 isActive isCompleted isCompletedExceptionally isCancelled
New (可选初始状态) false false false false
Active (默认初始状态) true false false false
Resolved (最终状态) false true false false
Failed (最终状态) false true true false
Cancelled (最终状态) false true true true

协程上下文与调度器

到这里,我们已经看到了下面这些启动协程的方式:

launch(CommonPool) {...}
async(CommonPool) {...}
run(NonCancellable) {...}

这里的 CommonPool 和 NonCancellable 是协程上下文(coroutine contexts)。本小节我们简单介绍一下自定义协程上下文。

调度和线程

协程上下文包括一个协程调度程序, 它可以指定由哪个线程来执行协程。调度器可以将协程的执行调度到一个线程池,限制在特定的线程中;也可以不作任何限制,让它无约束地运行。请看下面的示例:

fun testDispatchersAndThreads() = runBlocking {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) {
        // 未作限制 -- 将会在 main thread 中执行
        println("Unconfined: I'm working in thread ${Thread.currentThread()}")
    }
    jobs += launch(coroutineContext) {
        // 父协程的上下文 : runBlocking coroutine
        println("coroutineContext: I'm working in thread ${Thread.currentThread()}")
    }
    jobs += launch(CommonPool) {
        // 调度指派给 ForkJoinPool.commonPool
        println("CommonPool: I'm working in thread ${Thread.currentThread()}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) {
        // 将会在这个协程自己的新线程中执行
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread()}")
    }
    jobs.forEach { it.join() }
}

运行上面的代码,我们将得到以下输出 (可能按不同的顺序):

Unconfined: I'm working in thread Thread[main,5,main]
CommonPool: I'm working in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
newSingleThreadContext: I'm working in thread Thread[MyOwnThread,5,main]
context: I'm working in thread Thread[main,5,main]

从上面的结果,我们可以看出:

  • 使用无限制的Unconfined上下文的协程运行在主线程中;
  • 继承了 runBlocking {...} 的context的协程继续在主线程中执行;
  • 而CommonPool在ForkJoinPool.commonPool中;
  • 我们使用newSingleThreadContext函数新建的协程上下文,该协程运行在自己的新线程Thread[MyOwnThread,5,main]中。

另外,我们还可以在使用 runBlocking 的时候显式指定上下文, 同时使用 run 函数来更改协程的上下文:

fun log(msg: String) = println("${Thread.currentThread()} $msg")

fun testRunBlockingWithSpecifiedContext() = runBlocking {
    log("$context")
    log("${context[Job]}")
    log("开始")

    val ctx1 = newSingleThreadContext("线程A")
    val ctx2 = newSingleThreadContext("线程B")
    runBlocking(ctx1) {
        log("Started in Context1")
        run(ctx2) {
            log("Working in Context2")
        }
        log("Back to Context1")
    }
    log("结束")
}

运行输出:

Thread[main,5,main] [BlockingCoroutine{Active}@b1bc7ed, EventLoopImpl@7cd84586]
Thread[main,5,main] BlockingCoroutine{Active}@b1bc7ed
Thread[main,5,main] 开始
Thread[线程A,5,main] Started in Context1
Thread[线程B,5,main] Working in Context2
Thread[线程A,5,main] Back to Context1
Thread[main,5,main] 结束

父子协程

当我们使用协程A的上下文启动另一个协程B时, B将成为A的子协程。当父协程A任务被取消时, B以及它的所有子协程都会被递归地取消。代码示例如下:

fun testChildrenCoroutine()= runBlocking<Unit> {
    val request = launch(CommonPool) {
        log("ContextA1: ${context}")

        val job1 = launch(CommonPool) {
            println("job1: 独立的协程上下文!")
            delay(1000)
            println("job1: 不会受到request.cancel()的影响")
        }
        // 继承父上下文:request的context
        val job2 = launch(context) {
            log("ContextA2: ${context}")
            println("job2: 是request coroutine的子协程")
            delay(1000)
            println("job2: 当request.cancel(),job2也会被取消")
        }
        job1.join()
        job2.join()
    }
    delay(500)
    request.cancel()
    delay(1000)
    println("main: Who has survived request cancellation?")
}

运行输出:

Thread[ForkJoinPool.commonPool-worker-1,5,main] ContextA1: [StandaloneCoroutine{Active}@5b646af2, CommonPool]
job1: 独立的协程上下文!
Thread[ForkJoinPool.commonPool-worker-3,5,main] ContextA2: [StandaloneCoroutine{Active}@75152aa4, CommonPool]
job2: 是request coroutine的子协程
job1: 不会受到request.cancel()的影响
main: Who has survived request cancellation?

通道

延迟对象提供了一种在协程之间传输单个值的方法。而通道(Channel)提供了一种传输数据流的方法。通道是使用 SendChannel 和使用 ReceiveChannel 之间的非阻塞通信。

通道 vs 阻塞队列

通道的概念类似于 阻塞队列(BlockingQueue)。在Java的Concurrent包中,BlockingQueue很好的解决了多线程中如何高效安全“传输”数据的问题。它有两个常用的方法如下:

  • E take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空, 阻塞进入等待状态直到BlockingQueue有新的数据被加入;

  • put(E e): 把对象 e 加到BlockingQueue里, 如果BlockQueue没有空间,则调用此方法的线程被阻塞,直到BlockingQueue里面有空间再继续。

通道跟阻塞队列一个关键的区别是:通道有挂起的操作, 而不是阻塞的, 同时它可以关闭。

代码示例:

package com.easy.kotlin

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking

class ChannelsDemo {
    fun testChannel() = runBlocking<Unit> {
        val channel = Channel<Int>()
        launch(CommonPool) {
            for (x in 1..10) channel.send(x * x)
        }
        println("channel = ${channel}")
        // here we print five received integers:
        repeat(10) { println(channel.receive()) }
        println("Done!")
    }
}


fun main(args: Array<String>) {
    val cd = ChannelsDemo()
    cd.testChannel()
}

运行输出:

channel = kotlinx.coroutines.experimental.channels.RendezvousChannel@2e817b38
1
4
9
16
25
36
49
64
81
100
Done!

我们可以看出使用Channel<Int>()背后调用的是会合通道RendezvousChannel(),会合通道中没有任何缓冲区。send函数被挂起直到另外一个协程调用receive函数, 然后receive函数挂起直到另外一个协程调用send函数。它是一个完全无锁的实现。

关闭通道和迭代遍历元素

与队列不同, 通道可以关闭, 以指示没有更多的元素。在接收端, 可以使用 for 循环从通道接收元素。代码示例:

fun testClosingAndIterationChannels() = runBlocking {
    val channel = Channel<Int>()
    launch(CommonPool) {
        for (x in 1..5) channel.send(x * x)
        channel.close() // 我们结束 sending
    }
    // 打印通道中的值,直到通道关闭
    for (x in channel) println(x)
    println("Done!")
}

其中, close函数在这个通道上发送一个特殊的 "关闭令牌"。这是一个幂等运算:对此函数的重复调用不起作用, 并返回 "false"。此函数执行后,isClosedForSend返回 "true"。但是, ReceiveChannelisClosedForReceive 在所有之前发送的元素收到之后才返回 "true"。

我们把上面的代码加入打印日志:

fun testClosingAndIterationChannels() = runBlocking {
    val channel = Channel<Int>()
    launch(CommonPool) {
        for (x in 1..5) {
            channel.send(x * x)
        }
        println("Before Close => isClosedForSend = ${channel.isClosedForSend}")
        channel.close() // 我们结束 sending
        println("After Close => isClosedForSend = ${channel.isClosedForSend}")
    }
    // 打印通道中的值,直到通道关闭
    for (x in channel) {
        println("${x} => isClosedForReceive = ${channel.isClosedForReceive}")
    }
    println("Done!  => isClosedForReceive = ${channel.isClosedForReceive}")
}

运行输出:

1 => isClosedForReceive = false
4 => isClosedForReceive = false
9 => isClosedForReceive = false
16 => isClosedForReceive = false
25 => isClosedForReceive = false
Before Close => isClosedForSend = false
After Close => isClosedForSend = true
Done!  => isClosedForReceive = true

生产者-消费者模式

使用协程生成元素序列的模式非常常见。这是在并发代码中经常有的生产者-消费者模式。代码示例:

fun produceSquares() = produce<Int>(CommonPool) {
    for (x in 1..7) send(x * x)
}

fun consumeSquares() = runBlocking{
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

这里的produce函数定义如下:

public fun <E> produce(
    context: CoroutineContext,
    capacity: Int = 0,
    block: suspend ProducerScope<E>.() -> Unit
): ProducerJob<E> {
    val channel = Channel<E>(capacity)
    return ProducerCoroutine(newCoroutineContext(context), channel).apply {
        initParentJob(context[Job])
        block.startCoroutine(this, this)
    }
}

其中,参数说明如下:

参数名 说明
context 协程上下文
capacity 通道缓存容量大小 (默认没有缓存)
block 协程代码块

produce函数会启动一个新的协程, 协程中发送数据到通道来生成数据流,并以 ProducerJob对象返回对协程的引用。ProducerJob继承了Job, ReceiveChannel类型。

管道

生产无限序列

管道(Pipeline)是一种模式, 我们可以用一个协程生产无限序列:

fun produceNumbers() = produce<Long>(CommonPool) {
    var x = 1L
    while (true) send(x++) // infinite stream of integers starting from 1
}

我们的消费序列的函数如下:

fun square(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
    for (x in numbers) send(x * x)
}

主代码启动并连接整个管线:

fun testPipeline() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = consumeNumbers(numbers) // squares integers
    //for (i in 1..6) println(squares.receive())
    while (true) {
        println(squares.receive())
    }
    println("Done!")
    squares.cancel()
    numbers.cancel()
}

运行上面的代码,我们将会发现控制台在打印一个无限序列,完全没有停止的意思。

管道与无穷质数序列

我们使用协程管道来生成一个无穷质数序列。

我们从无穷大的自然数序列开始:

fun numbersProducer(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var n = start
    while (true) send(n++) // infinite stream of integers from start
}

这次我们引入一个显式上下文参数context, 以便调用方可以控制我们的协程运行的位置。

下面的管道将筛选传入的数字流, 过滤掉可以被当前质数整除的所有数字:

fun filterPrimes(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
    for (x in numbers) if (x % prime != 0) send(x)
}

现在我们通过从2开始, 从当前通道中取一个质数, 并为找到的每个质数启动新的管道阶段, 从而构建出我们的管道:

numbersFrom(2) -> filterPrimes(2) -> filterPrimes(3) -> filterPrimes(5) -> filterPrimes(7) ...

测试无穷质数序列:

fun producePrimesSequences() = runBlocking {
    var producerJob = numbersProducer(context, 2)

    while (true) {
        val prime = producerJob.receive()
        print("${prime} \t")
        producerJob = filterPrimes(context, producerJob, prime)
    }
}

运行上面的代码,我们将会看到控制台一直在无限打印出质数序列:

image

通道缓冲区

我们可以给通道设置一个缓冲区:

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(4) // 创建一个缓冲区容量为4的通道
    launch(context) {
        repeat(10) {
            println("Sending $it")
            channel.send(it) // 当缓冲区已满的时候, send将会挂起
        }
    }
    delay(1000)
}

输出:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

构建无穷惰性序列

我们可以使用 buildSequence 序列生成器 ,构建一个无穷惰性序列。

val fibonacci = buildSequence {
    yield(1L)
    var current = 1L
    var next = 1L
    while (true) {
        yield(next)
        val tmp = current + next
        current = next
        next = tmp
    }
}

我们通过buildSequence创建一个协程,生成一个惰性的无穷斐波那契数列。该协程通过调用 yield() 函数来产生连续的斐波纳契数。

我们可以从该序列中取出任何有限的数字列表,例如

println(fibonacci.take(16).toList())

的结果是:

[1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]

协程与线程比较

直接先说区别,协程是编译器级的,而线程是操作系统级的。

协程通常是由编译器来实现的机制。线程看起来也在语言层次,但是内在原理却是操作系统先有这个东西,然后通过一定的API暴露给用户使用,两者在这里有不同。

协程就是用户空间下的线程。用协程来做的东西,用线程或进程通常也是一样可以做的,但往往多了许多加锁和通信的操作。

线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。

协程并不是取代线程, 而是抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。

线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。

协程的好处

与多线程、多进程等并发模型不同,协程依靠user-space调度,而线程、进程则是依靠kernel来进行调度。线程、进程间切换都需要从用户态进入内核态,而协程的切换完全是在用户态完成,且不像线程进行抢占式调度,协程是非抢占式的调度。

通常多个运行在同一调度器中的协程运行在一个线程内,这也消除掉了多线程同步等带来的编程复杂性。同一时刻同一调度器中的协程只有一个会处于运行状态。

我们使用协程,程序只在用户空间内切换上下文,不再陷入内核来做线程切换,这样可以避免大量的用户空间和内核空间之间的数据拷贝,降低了CPU的消耗,从而大大减缓高并发场景时CPU瓶颈的窘境。

另外,使用协程,我们不再需要像异步编程时写那么一堆callback函数,代码结构不再支离破碎,整个代码逻辑上看上去和同步代码没什么区别,简单,易理解,优雅。

我们使用协程,我们可以很简单地实现一个可以随时中断随时恢复的函数。

一些 API 启动长时间运行的操作(例如网络 IO、文件 IO、CPU 或 GPU 密集型任务等),并要求调用者阻塞直到它们完成。协程提供了一种避免阻塞线程并用更廉价、更可控的操作替代线程阻塞的方法:协程挂起。

协程通过将复杂性放入库来简化异步编程。程序的逻辑可以在协程中顺序地表达,而底层库会为我们解决其异步性。该库可以将用户代码的相关部分包装为回调、订阅相关事件、在不同线程(甚至不同机器)上调度执行,而代码则保持如同顺序执行一样简单。

阻塞 vs 挂起

协程可以被挂起而无需阻塞线程。而线程阻塞的代价通常是昂贵的,尤其在高负载时,阻塞其中一个会导致一些重要的任务被延迟。

另外,协程挂起几乎是无代价的。不需要上下文切换或者 OS 的任何其他干预。

最重要的是,挂起可以在很大程度上由用户来控制,我们可以决定挂起时做些,并根据需求优化、记日志、拦截处理等。

协程的内部机制

基本原理

协程完全通过编译技术实现(不需要来自 VM 或 OS 端的支持),挂起机制是通过状态机来实现,其中的状态对应于挂起调用。

在挂起时,对应的协程状态与局部变量等一起被存储在编译器生成的类的字段中。在恢复该协程时,恢复局部变量并且状态机从挂起点接着后面的状态往后执行。

挂起的协程,是作为Continuation对象来存储和传递,Continuation中持有协程挂起状态与局部变量。

关于协程工作原理的更多细节可以在这个设计文档中找到。

标准 API

协程有三个主要组成部分:

  • 语言支持(即如上所述的挂起功能),
  • Kotlin 标准库中的底层核心 API,
  • 可以直接在用户代码中使用的高级 API。
  • 底层 API:kotlin.coroutines

底层 API 相对较小,并且除了创建更高级的库之外,不应该使用它。 它由两个主要包组成:

kotlin.coroutines.experimental 带有主要类型与下述原语:

  • createCoroutine()
  • startCoroutine()
  • suspendCoroutine()

kotlin.coroutines.experimental.intrinsics 带有甚至更底层的内在函数如 :

  • suspendCoroutineOrReturn()

大多数基于协程的应用程序级API都作为单独的库发布:kotlinx.coroutines。这个库主要包括下面几大模块:

  • 使用 kotlinx-coroutines-core 的平台无关异步编程
  • 基于 JDK 8 中的 CompletableFuture 的 API:kotlinx-coroutines-jdk8
  • 基于 JDK 7 及更高版本 API 的非阻塞 IO(NIO):kotlinx-coroutines-nio
  • 支持 Swing (kotlinx-coroutines-swing) 和 JavaFx (kotlinx-coroutines-javafx)
  • 支持 RxJava:kotlinx-coroutines-rx

这些库既作为使通用任务易用的便利的 API,也作为如何构建基于协程的库的端到端示例。关于这些 API 用法的更多细节可以参考相关文档。

本章小结

本章我通过大量实例学习了协程的用法;同时了解了作为轻量级线程的协程是怎样简化的我们的多线程并发编程的。我们看到协程通过挂起机制实现非阻塞的特性大大提升了我们并发性能。

最后,我们还简单介绍了协程的实现的原理以及标准API库。Kotlin的协程的实现大量地调用了Java中的多线程API。所以在Kotlin中,我们仍然完全可以使用Java中的多线程编程。

本章示例代码工程:https://github.com/EasyKotlin/chapter9_coroutines

image
EchoEcho官方
无论前方如何,请不要后悔与我相遇。
1377
发布数
439
关注者
2243896
累计阅读

热门教程文档

QT
33小节
C#
57小节
Djiango
17小节
Linux
51小节
Golang
23小节