Python异步初探-协程的定义
序言
由于很多文章对协程的介绍都很精简,在探讨Python异步编程之前,我们先来明确协程的概念,避免以后混淆了异步和协程的概念。
什么是协程
协程,又称微线程,英文名为 Coroutine。
协程(Coroutine)的概念提出的很早,在操作系统层面上,与它关联的是线程(Thread), 进程(Process)。从设计理念出发,协程、线程和进程都是为了更好的分配和利用CPU和内存资源。
在早期的操作系统中,进程 是程序执行的基本实体,随着支持多线程CPU的出现,程序执行的基本实体被 线程 所取代,但资源的分配单位仍然为进程。
一般而言,进程 是操作系统进行资源分配和调度的单位,线程 是操作系统进行CPU分配和调度的单位,而 协程 则隶属于 线程 ,由用户自主控制任务的调度, 而非依赖操作系统的抢占式机制。
+---------+ +---------+ +---------+ +---------+ +---------+
|Coroutine| |Coroutine| |Coroutine| |Coroutine| .......... |Coroutine|
+----+----+ +----+----+ +----+----+ +----+----+ +----+----+
| | | | |
+---------------------------+ +------+-----------------+
| |
+------v-------+ +-------v------+
| | | |
| Thread | | Thread |
| | | |
+------+-------+ +-------+------+
| |
+-----------------+-----------------+
|
|
+----------------------------v------------------------------+
| |
| Process |
| |
+-----------------------------------------------------------+
## 协程、线程、进程 之间的关系
协程的实现
由于协程的执行需要用户自动调度,因此实现协程的主要难题是恢复和挂起, 针对恢复和挂起的实现方式,协程的实现又可以区分为两类,分别是:
- 对称式协程(symmetric coroutine)
- 非对称式协程(asymmetric coroutine)
对称式协程, 是指所有协程之间是等价的,因此对称式协程之间允许互相交换(
transfer/switch
)控制权,拥有极大的自由,但该操作与goto
类似, 虽然能让程序跳转到任何地方, 但这样的代码却十分不好维护。
非对称式协程 认为协程之间存在父子关系, 协程只支持两类控制权转移原语, 分别是yield
和resume
。非对称协程通过yield
将控制权返回到其父协程(调用者),resume
则将控制权转移至 子协程(被调用者)
由于协程可以实现恢复和挂起,因此协程(Coroutine)的生命周期与通常的函数(Function)不大一样, 函数依循堆栈式的调用,进入函数->执行函数体->函数返回,这就是函数的生命周期。但是协程不一样,协程的执行可以挂起,让出CPU的控制权执行别处的代码,当协程被唤醒并完全运行结束后,协程才真正退出。
从函数和协程的生命周期可以看出,协程的恢复和挂起本质上就是 用户态的上下文切换,也就意味着需要实现的是调用堆栈的保存和恢复。
同时,对称式协程 和 非对称式协程 对于堆栈的保存和恢复的实现方案也有所差异,前者由于支持所有协程之间相互跳转,因此切换时需要对整个堆栈进行保存,切入时需要将整个堆栈进行恢复,同时有可能需要合并现有的堆栈与缓存堆栈的状态;后者由于只支持层级关系之间的跳转,只需要保存栈顶的堆栈和恢复栈顶的堆栈即可。
协程的官方实现介绍
在历史上, Python 首先是通过生成器(generator)
实现协程(coroutine)
的 单层 虚拟机栈帧的保存和复原,属于 非对称式协程。
import random
def factorial_generator(n):
v = n
while n > 0:
n -= 1
yield v*n
return v
def random_add(num_producer, factor=0.5):
_s = 0
while True:
try:
## resume a coroutine
xc = next(num_producer)
if random.random() >= factor:
_s += xc
except StopIteration as e:
## return value of coroutine
print(e.value)
break
return _s
def main():
result = random_add(factorial_generator(10))
print(result)
从协程调用的时序图可以看出,factorial_generator
和 random_add
在主函数中均只调用了一次,但程序的执行却是在 factorial_generator
和 random_add
之间穿插着进行,这就是非对称协程之间的控制权转移。
协程的社区实现介绍
虽然 CPython 能借助生成器实现非对称式协程,但Python的虚拟机栈帧本身并不是为了协程而设计的,语言层级上通过 yield
关键字最多只能保留一层的栈帧,也就是说在语言层级上并没有提供处理栈帧的挂起与恢复的机制,社区对该实现并不满意,因此从 Stackless Python 抽离出协程的实现,并命名为 greenlet。
greenlet实现的基本原理很简单, 为了简化栈帧的保存和恢复,其主要遵循以下几点规约:
- 每个线程均有线程内唯一的 main greenlet 对象
- 除了 main greenlet 以外, 任意 greenlet 均有唯一的 parent (greenlet对象)
- 创建 greenlet 对象时, 如不指定 parent, 默认指定当前在执行的协程为 parent, 且最外层的 parent 为 main greenlet
- greenlet 执行完毕后, 返回 parent 继续执行
- 使用 switch 切换至非子协程后, 需用户 手动switch 返回该协程或该协程的子协程, 该协程才继续执行
基于以上的规约, greenlet 解决了堆栈如何保存和复原的问题,其实现的原理可以简单描述为一下几步:
- 将 function 封装为 greenlet 对象, 并返回 greenlet 句柄, 用于指定特定的协程上下文状态。
- 使用 greenlet.switch(*args, **kwargs) 进行任意协程的切换
- 切出时保存当前协程的栈帧到堆内存
- 切入时, 重新将协程的栈帧推入到堆栈, 协程即重新运行
- 当目标协程未执行时, 传入参数作为函数的标准参数
- 当目标协程已执行但被挂起,协程被唤醒,且传入参数作为上次被挂起的语句的返回值
- 当目标协程已终止,递归至其父协程
- 协程执行完毕
- 标记协程状态为
DEAD
, switch 返回父协程继续执行 - 当父协程为 main greenlet 时, 协程执行链结束
- 标记协程状态为
协程的使用场景
由于协程的设计理念强调了由用户控制任务的执行和调度,因此协程为协同任务提供了一种运行时抽象,这种抽象非常适合于协同多任务调度和数据流处理。同时,因为用户态的上下文切换代价比内核态的上下文切换要小,协程也成为了一种轻量级的多任务模型。
1. 数据流处理
所谓的数据流处理,其实可以抽象成生产者/消费者
问题,这也是典型的多线程并发合作问题。如果使用多线程模型解决生产者/消费者问题,由于竞争的存在,往往需要使用锁或信号进行同步。如果使用协程解决该问题,由于协程是基于单线程模型
设计的,协程之间无竞争条件,反而是处于协助的关系。
from greenlet import greenlet
from collections import Counter
word_counter = Counter()
def FileReader(fh):
while 1:
## 从数据流中产生数据
line = fh.readline()
for word in lines.split():
## 推送给消费者
consumer_instance.switch(word)
def WordProcessor(word):
while 1:
## 消费数据
process(word)
word = producer_instance.switch()
def process(word):
word_counter[str(word).lower()]+=1
producer_instance = greenlet(FileReader)
consumer_instance = greenlet(WordProcessor)
with open("/dev/random", mode="rb") as fh:
producer_instance.switch(fh)
2. 实现尾递归优化,避免堆栈溢出
import types
def fibonaci(n):
"""fibonaci 尾递归优化"""
def fibonaci_core(n, n1=1, n2=1):
if n == 1:
yield n2
## Tips: 其实是由于Python解释器未实现尾递归优化, 才需要这样的膜法
yield fibonaci_core(n-1, n2, n1+n2)
g = fibonaci_core(n)
while isinstance(g, types.GeneratorType):
g = next(g)
return g
3. 多任务模型
目前主流的多任务模型的解决方案是使用多线程,由于线程之间的切换是由操作系统在内核态中完成,其切换开销比在用户态进行上下文操作要大。同时,CPython的实现上具有GIL, 全局解释器锁,导致CPython上的多线程实际上同时只运行单个线程,因此使用协程的开销比使用多线程要低,而对CPU的利用程度却几乎是一样的。
同时,由于协程是单线程运行的关系,如果使用协程执行CPU密集型的应用,那么协程特有的挂起和恢复就毫无用处,无法通过切换协程来换取更有效的CPU执行效率,这意味着协程的最佳使用场景是IO密集型的应用,典型的应用场景如解决网络IO问题。
from greenlet import greenlet
import socket
from urllib.parse import urlparse
#使用非阻塞io完成http请求
def get_url(url):
#通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
#建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False) ## 这里设置成非阻塞
try:
client.connect((host, 80)) ## 阻塞不会消耗cpu
except BlockingIOError as e:
pass
while True:
try:
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
break
except OSError as e:
## 数据在发送, 主动让出cpu
scheduler_instance.switch()
continue
data = b""
while True:
try:
d = client.recv(1024)
except BlockingIOError as e:
## 数据未接收, 主动让出cpu
scheduler_instance.switch()
continue
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
results.append({url: html_data})
client.close()
def scheduler():
while not all([job.dead for job in jobs]):
for i, job in enumerate(jobs):
job.switch(urls[i])
scheduler_instance = greenlet(scheduler)
urls = ["http://www.baidu.com/", "http://www.qq.com/"]
jobs = [greenlet(get_url, scheduler_instance) for i in range(len(urls))]
results = []
scheduler_instance.switch()
总结: 协程的本质, 与异步编程的关系
在这篇文章里,我们首先介绍了协程的定义和具体的实现方案,从本质上来看,协程就是一套在用户态进行上下文切换的解决方案。
随后,我们介绍了协程的使用场景,通过对协程调度的合理安排,我们可以实现无竞争的生产者消费者模型以及复用函数堆栈避免堆栈溢出的问题,最后借助操作系统的支持,我们甚至可以实现并行的IO操作。
至于协程与异步编程的关系,这里明确告诉大家一点,协程本身并未解决异步编程的问题,而是提供了一套由用户主动分配CPU时钟的可行方案。
既然协程并不能实现异步编程,那为何异步编程与协程有着千丝万缕的关系呢?
其实答案很简单,虽然协程不是解决异步问题,但是可以通过合理的调度算法
和适当的编程规约
,从而实现异步编程。
至于协程与异步编程的关系,有请下回分解 😃