Python异步编程-探讨协程实现异步的细节

shabbywu大约 12 分钟python

序言

在上回 Python异步初探-协程的定义 里,我们探讨了协程的本质,明确了协程本身并未解决异步编程的问题,而是提供了一套由用户主动分配CPU时钟的可行方案。 接下来,本文将会从任务调度入手,探讨 Python 实现协程异步编程的细节。

多线程的任务调度

每当我们需要进行异步编程,首先想到的必然是多线程模型,毕竟只需要几行代码,就能启动线程执行异步任务。

import threading
import time


def task(task_id):
    start_clock = time.process_time()
    print(f"task<{task_id}> start")
    ## 模拟阻塞
    time.sleep(10)
    print(f"task<{task_id}> end, cost cpu time: {time.process_time() - start_clock}")

for i in range(5):
    threading.Thread(target=task, args=(i, )).start()

在探讨协程调度之前, 我们先以上面的多线程异步编程的样例,分析多线程任务的执行顺序。

操作系统调度

当启动一个线程时,线程并非立即执行,而是在等待操作系统的资源调度。当线程被分配到CPU时钟时,线程才真正开始执行。

CPU能执行那个线程,是通过操作系统设置的调度策略而决定的,如果感兴趣可以阅读操作系统相关的书籍,这里就不展开了。

继续讨论回我们上面给的样例,如果我们重复执行几次这个样例, 就会发现, 多线程任务的调度策略和启动顺序是无关的,在重复执行多次还能发现,每次任务调度的顺序也是无关的。

## 第一次执行的输出结果
task<0> start
task<1> start
task<2> start
task<3> start
task<4> start
### 这里等待了 10 秒
task<0> end, cost cpu time: 0.000682000000000002
task<4> end, cost cpu time: 0.0003599999999999992
task<3> end, cost cpu time: 0.0005119999999999986
task<2> end, cost cpu time: 0.0005790000000000031
task<1> end, cost cpu time: 0.0006910000000000041

## 第二次执行的输出结果
task<0> start
task<1> start
task<2> start
task<3> start
task<4> start
### 这里等待了 10 秒
task<0> end, cost cpu time: 0.0005179999999999976
task<2> end, cost cpu time: 0.0005100000000000035
task<3> end, cost cpu time: 0.00044700000000000295
task<1> end, cost cpu time: 0.0006960000000000022
task<4> end, cost cpu time: 0.0003809999999999994

由于多线程的任务调度是由操作系统底层的调度算法决定的,这导致用户无法感知和控制各线程被分配的CPU时间片大小以及执行顺序,而协程恰好就提供了一套由用户主动分配CPU时钟的可行方案。
换句话说, 如果我们能合理地控制协程的执行顺序, 那是不是就等同于实现了一个基于协程的异步框架呢?!

协程的任务调度

任务队列-控制线程任务执行顺序的方案

所谓的控制协程执行顺序,实际上就是要在用户态中模拟操作系统针对线程的调度算法,最直观的实现方案就是用优先队列将异步任务缓冲起来,再由用户程序按任务优先级排序,调度器只需依序执行即可。这种类型的异步任务执行模型如下:

虽然这个模型在设计上满足需求的,但是在编程实现时又引入了新问题: 何时创建协程执行异步任务?
如果是在创建异步任务时,马上创建新的协程执行任务,那么这个模型和原来的原地创建协程并未任何本质差异,因此不可能同时创建任务和创建协程。
通过分析这个模型可以发现,这是典型的生产者消费者模式,只要生产者(推送任务到队列)和消费者(创建新协程执行任务)分别在两个不同的协程运行,那就可以“避免”同时创建任务和创建协程的问题了,调整后的模型如下:

对应的实现代码也很简单:

def consumer(queue):
    while True:
        task, args, kwargs = queue.get()
        task.run(*args, **kwargs)
        yield


def producer(queue):
    while True:
        queue.put(task, args, kwargs)
        yield

通过将原来的异步任务执行模型改造成上图所示的生产者消费者模式后,看似解决了创建协程的问题,但是再仔细分析新的模型,可以发现: 如果消费者执行完一个任务就马上让出CPU让生产者创建任务,那么本质上与同时创建和执行协程没有任何区别。

虽然引入队列解决了任务执行优先级的问题,但是并未解决调度协程的时机,我们不妨从竞品(线程)中寻找实现调度算法的灵感,了解业界做法总不会错

调度算法的核心职责

首先观察一下线程的生命周期,一个线程的生命周期可能会在5个状态中轮转, 分别是创建,就绪,运行,阻塞,终止,这几种关系的状态转移关系入下图所示:

协程调度算法的本质其实就是解决一个问题:如何调度协程的执行?如果从任务(协程)的角度看待这个问题,可以将其拆解为以下几点:

  • 何时让出 CPU?
  • CPU 让给了谁?
  • 何时继续执行?

何时让出 CPU?-- 控制权转移的时机

协程的最佳使用场景是IO密集型的应用,当需要等待 IO 操作执行时,协程可以主动让出 CPU 的使用权,避免浪费 CPU 资源。因此,协程应当在需要等待其他资源执行的地方主动让出 CPU,用伪代码描述如下:

def co():
    result = yield one_blocking()
    handle(result)
    result = yield the_other_blocking()
    handle(result)

CPU 让给了谁?-- 任务的调度器

在上回 Python异步初探-协程的定义 里,我们讲解了非对称式协程的执行顺序:非对称式协程通过 send 原语将控制权转移至子协程(被调用者),再通过 yield 原语将控制权返回到其父协程(调用者)。
因此,协程的异步系统需要具备一个调度器负责接收子协程让渡出去的控制权,再从任务队列中挑选合适的协程继续执行。与此同时,这个调度器还需要保证只有当需要等待的 IO 操作执行完毕后,才继续执行上一个协程,用伪代码描述如下:

def scheduler():
    while True:
        ## 获取协程任务列表
        cos = list(task_queue)
        for co in cos:
            ret = co.send()
            if ret is some_blocking:
                ret.callback = co
                task_queue.put(ret)
            elif hasattr(co, "callback"):
                task_queue.put(co.callback)
            else:
                task_queue.pop(co)

调度器既是生产者, 也是消费者。

何时继续执行? -- 其一: 基于回调的事件处理机制

对于多线程而言,由于任务调度是委托至操作系统完成的,程序无需关心线程被阻塞后的执行情况。
但是对于协程调度系统而言,如果其中一个协程被阻塞,就意味着整个系统都被阻塞,只有当阻塞操作完成后,协程系统才会继续运行下去。 阻塞对整个协程系统是毁灭性的,所有协程都应该只允许使用非阻塞 IO 操作,同时在调用 IO 操作后,需要马上让渡出 CPU 的使用权给调度器,当调度器判断 IO操作完成后,再继续执行该协程。 单个协程的调度流程可以参考以下的时序图:

对于网络IO、磁盘IO等调用,操作系统均提供了异步读写的系统调用(system call)。一般而言,异步 IO 调用会返回文件描述符(句柄),用户程序只需要调用操作系统提供的 API,就可以知道 IO 操作是否完成。
简而言之,只需要在调度器中增加判断 IO 操作是否执行完毕的操作即可,用伪代码描述如下:

def scheduler():
    while True:
        ## 获取协程任务列表
        cos = list(task_queue)
        for co in cos:
            ret = co.send()
            if ret is some_blocking:
                ret.callback = co
                task_queue.put(ret)
            elif ret is io_operation:
                ret.callback = co
                register_io_operation(ret)
            elif hasattr(co, "callback"):
                task_queue.put(co.callback)
            else:
                task_queue.pop(co)
        
        ## 获取已完成的 IO 操作
        ops = get_completed_io_operations()
        for op in ops:
            task_queue.put(op.callback)

何时继续执行? -- 其二: 处理与操作系统无关的阻塞

除了由于IO导致的阻塞以外, 很多时候我们只希望等待几秒钟再执行, 例如在标准库 asyncio 中提供了以下的语法。

import time
import asyncio

async def co():
    start = time.time()
    print("It's going to sleep 1s.")
    await asyncio.sleep(1)
    end = time.time()
    print(f"{end - start}s passed.")


if __name__ == '__main__':
    ## 输出: 
    ## It's going to sleep 1s.
    ## 1.0028209686279297s passed.
    asyncio.run(co())

上述例子中关键的代码是: await asyncio.sleep(1),其所描述的含义是: 等待1秒, 随后继续执行。 参考对于 io_operation 的处理, 这类型的阻塞可以抽象成 timeout_operation, 当满足继续执行的条件后, 再将相应的操作注册会任务队列之中,用伪代码描述如下:

def scheduler():
    ...
    ret = co.send()
    if ret is timeout_operation:
        ret.callback = co
        register_timeout_operation(ret)
    
    ...

    ops = get_timeout_operations()
    for op in ops:
        task_queue.put(op.callback)
    ...

至此, 我们已经具备实现一个简易的基于协程的异步系统的基本理论知识, 但仍然有一点是我们尚未讨论的, 那就是如何解决阻塞任务的返回值传递问题

除了调度之外, 你还需要了解的一些细节

非对称式协程 中, send 原语不仅仅是控制权转移至子协程(被调用者),还充当了参数传递的作用。而 yield 原语也不仅仅是将控制权返回到其父协程(调用者),还可以将返回值带回父协程,参考以下的代码:

def coroutine():
    arg = yield 1
    print("参数: ", arg)

if __name__ == "__main__":
    ## 输出:
    ## 返回值:  1
    ## 参数:  2
    co = coroutine()
    print("返回值: ", co.send(None))
    co.send(2)

将该逻辑整合至单个协程的调度流程时序图:

我们可以发现, 对于返回值的处理是属于调度器的任务, 因此想要解决阻塞任务的返回值传递问题,只需要调度器在处理回调时,将对应的返回值一并带上即可,用伪代码描述如下:

提示

以下代码仅在于描述核心逻辑。 在实现异步系统返回值处理时, 常见的机制为: Promise/Future。

def scheduler():
    while True:
        cos = list(task_queue)
        for co, args in cos:
            ret = co.send(args)
            if ret is some_blocking:
                ret.callback = co
                task_queue.put(ret)
            elif ret is io_operation:
                ret.callback = co
                register_io_operation(ret)
            elif ret is timeout_operation:
                ret.callback = co
                register_timeout_operation(ret)
            elif hasattr(co, "callback"):
                task_queue.put(co.callback)
            else:
                task_queue.pop(co)
        
        ops = get_completed_io_operations()
        for op, result in ops:
            task_queue.put((op.callback, result))

        ops = get_timeout_operations()
        for op in ops:
            task_queue.put((op.callback, None))

总结: 协程异步系统的基本要素

在这篇文章中, 我们首先从异步编程这一话题展开, 通过分析操作系统调度多线程的原理, 引申出实现协程异步编程无非就是实现控制协程执行顺序是调度算法。
随后, 我们分别从任务执行顺序任务生命周期两个维度探讨应该如何实现协程任务调度系统, 总结出实现协程任务调度系统所需的基本要素为以下几点:

  • 任务队列
  • 操作系统IO回调处理机制
  • 延迟执行机制
  • 返回值传递机制

协程异步系统依赖于操作系统提供的异步IO回调(system call), 因此同一门编程语言的协程异步系统, 在不同的操作系统上可能会提供不同的实现方式。同时,由于协程异步系统的本质是在任务(协程)的调度算法, 因此不同的编程语言也有可能实现不同的调度算法。
如果想了解Python, JavaScript 和 Golang 在协程实现上有哪些差异, 不妨继续关注我未来可能会更新的文章。