平凡之路——asyncio 的演进

如今,地球上最发达、规模最庞大的计算机程序,莫过于因特网。而从 CPU 的时间观中可知,网络 I/O 是最大的 I/O 瓶颈,除了宕机没有比它更慢的。所以,诸多异步框架都对准的是网络 I/O。

我们从一个爬虫例子说起,从因特网上下载 10 篇网页。

同步阻塞方式

最容易想到的解决方案就是依次下载,从建立 socket 连接到发送网络请求再到读取响应数据,顺序进行。

#!/usr/bin/env python
# encoding: utf-8

import socket
import time
from concurrent import futures


def blocking_way():
"""
建立 socket 连接,发送HTTP请求,然后从 socket读取HTTP响应并返回数据。示例中我们请求 example.com 的首页。
"""
sock = socket.socket()
# 以blocking的方式向指定网址80端口发送网络连接请求
sock.connect(('example.com', 80))
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
# 从socket上读取4K字节数据
chunk = sock.recv(4096)
while chunk:
response += chunk
# blocking
chunk = sock.recv(4096)
return response


def sync_way():
"""
执行10次,返回结果
"""
res = []
for _ in range(10):
res.append(blocking_way())
return len(res)

注:总体耗时约为 4.5 秒。(因网络波动每次测试结果有所变动,本文取多次平均值)

我们知道,创建网络连接,多久能创建完成不是客户端决定的,而是由网络状况和服务端处理能力共同决定。服务端什么时候返回了响应数据并被客户端接收到可供程序读取,也是不可预测的。所以 sock.connect()和 sock.recv()这两个调用在默认情况下是阻塞的。

注:sock.send()函数并不会阻塞太久,它只负责将请求数据拷贝到 TCP/IP 协议栈的系统缓冲区中就返回,并不等待服务端返回的应答确认。

假设网络环境很差,创建网络连接需要 1 秒钟,那么sock.connect()就得阻塞 1 秒钟,等待网络连接成功。这 1 秒钟对一颗 2.6GHz 的 CPU 来讲,仿佛过去了 83 年,然而它不能干任何事情。sock.recv()也是一样的,必须得等到服务端的响应数据已经被客户端接收。我们下载 10 篇网页,这个阻塞过程就得重复 10 次。如果一个爬虫系统每天要下载 1000 万篇网页呢?!

上面说了这么多,我们力图引出一个问题:同步阻塞的网络交互方式,效率低十分低下。特别是在网络交互频繁的程序中。这种方式根本不可能挑战 C10K/C10M。

改进方式:多进程

在一个程序内,依次执行 10 次太耗时,那开 10 个一样的程序同时执行不就行了。于是我们想到了多进程编程。为什么会先想到多进程呢?发展脉络如此。在更早的操作系统(Linux 2.4)及其以前,进程是 OS 调度任务的实体,是面向进程设计的 OS。

def process_way():
"""
进程池方式
:return:
"""
workers = 10
with futures.ProcessPoolExecutor(workers) as executor:
futs = {executor.submit(blocking_way) for _ in range(try_count)}
return len([fut.result() for fut in futs])

注:总体耗时约为 0.6 秒。

改善效果立竿见影。但仍然有问题。总体耗时并没有缩减到原来的十分之一,而是九分之一左右,还有一些时间耗到哪里去了——进程切换开销

# count = 10
(10, 'sync_way', 0.3614327907562256)
(10, 'thread_way', 0.05895590782165527)
(10, 'process_way', 0.6955974102020264)
# count = 80
(80, 'sync_way', 5.538213014602661)
(80, 'thread_way', 1.2612965106964111)
(80, 'process_way', 1.0214107036590576)

注意
经本人实测,当只尝试 10 次连接时,进程池并没有达到所说的缩短时间的目的。当连接请求次数够多时,我们才能看到进程池的效果符合预期。

进程切换开销不止像“CPU 的时间观”所列的“上下文切换”那么低。CPU 从一个进程切换到另一个进程,需要把旧进程运行时的寄存器状态、内存状态全部保存好,再将另一个进程之前保存的数据恢复。对 CPU 来讲,几个小时就干等着。当进程数量大于 CPU 核心数量时,进程切换是必然需要的。

除了切换开销,多进程还有另外的缺点。一般的服务器在能够稳定运行的前提下,可以同时处理的进程数在数十个到数百个规模。如果进程数量规模更大,系统运行将不稳定,而且可用内存资源往往也会不足。

多进程解决方案在面临每天需要成百上千万次下载任务的爬虫系统,或者需要同时搞定数万并发的电商系统来说,并不适合。

除了切换开销大,以及可支持的任务规模小之外,多进程还有其他缺点,如状态共享等问题,后文会有提及,此处不再细究。

继续改进:多线程

由于线程的数据结构比进程更轻量级,同一个进程可以容纳多个线程,从进程到线程的优化由此展开。后来的 OS 也把调度单位由进程转为线程,进程只作为线程的容器,用于管理进程所需的资源。而且 OS 级别的线程是可以被分配到不同的 CPU 核心同时运行的。

def thread_way():
"""
线程池方式
"""
workers = 10
with futures.ThreadPoolExecutor(workers) as executor:
futs = {executor.submit(blocking_way) for _ in range(try_count)}
return len([fut.result() for fut in futs])

注:总体运行时间约 0.43 秒。

结果符合预期,比多进程耗时要少些。从运行时间上看,多线程似乎已经解决了切换开销大的问题。而且可支持的任务数量规模,也变成了数百个到数千个。

但是,多线程仍有问题,特别是 Python 里的多线程。首先,Python 中的多线程因为 GIL 的存在,它们并不能利用 CPU 多核优势,一个 Python 进程中,只允许有一个线程处于运行状态。那为什么结果还是如预期,耗时缩减到了十分之一?

因为在做阻塞的系统调用时,例如sock.connect()sock.recv()时,当前线程会释放 GIL,让别的线程有机会执行。但是单个线程内,在调用上还是阻塞的。

Python 中 time.sleep 是阻塞的,都知道使用它要谨慎,但在多线程编程中,time.sleep 并不会阻塞其他线程。

除了 GIL 之外,所有的多线程还有通病。它们是被 OS 调度,调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪个线程被运行,也不知道它正要执行的代码是什么。所以就可能存在竞态条件

例如爬虫工作线程从任务队列拿待抓取 URL 的时候,如果多个爬虫线程同时来取,那这个任务到底该给谁?那就需要用到“锁”或“同步队列”来保证下载任务不会被重复执行。

而且线程支持的多任务规模,在数百到数千的数量规模。在大规模的高频网络交互系统中,仍然有些吃力。当然,多线程最主要的问题还是竞态条件

线程池与多线程的区别

  • 线程池是在程序运行开始,创建好的 n 个线程,并且这 n 个线程挂起等待任务的到来。而多线程是在任务到来得时候进行创建,然后执行任务。
  • 线程池中的线程执行完之后不会回收线程,会继续将线程放在等待队列中;多线程程序在每次任务完成之后会回收该线程。
  • 由于线程池中线程是创建好的,所以在效率上相对于多线程会高很多。
  • 线程池也在高并发的情况下有着较好的性能;不容易挂掉。多线程在创建线程数较多的情况下,很容易挂掉。

非阻塞方式

终于,我们来到了非阻塞解决方案。先来看看最原始的非阻塞如何工作的。

def non_blocking_way():
sock = socket.socket()
# https://docs.python.org/zh-cn/3/library/socket.html#socket.socket.setblocking
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
# 非阻塞连接过程中也会抛出异常
pass
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
data = request.encode('ascii')
# 不知道socket何时就绪,所以不断尝试发送
while True:
try:
sock.send(data)
# 直到send不抛异常,则发送完成
break
except OSError:
pass

response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass

return response

注:总体耗时约 4.3 秒。

首先注意到两点,就感觉被骗了。一是耗时与同步阻塞相当,二是代码更复杂。坑爹呢?要非阻塞何用!且慢。

在我的实验中,非阻塞消耗明显大于阻塞消耗。

blocking_way (10, 'sync_way', 0.1429271697998047)
non_blocking_way (10, 'sync_way', 8.619038343429565)

上图第 9 行代码sock.setblocking(False)告诉 OS,让 socket 上阻塞调用都改为非阻塞的方式。之前我们说到,非阻塞就是在做一件事的时候,不阻碍调用它的程序做别的事情。上述代码在执行完 sock.connect()sock.recv() 后的确不再阻塞,可以继续往下执行请求准备的代码或者是执行下一次读取。

代码变得更复杂也是上述原因所致。第 11 行要放在try语句内,是因为socket在发送非阻塞连接请求过程中,系统底层也会抛出异常。connect()被调用之后,立即可以往下执行第 15 和 16 行的代码。

需要 while 循环不断尝试 send(),是因为connect()已经非阻塞,在send()之时并不知道 socket 的连接是否就绪,只有不断尝试,尝试成功为止,即发送数据成功了。recv()调用也是同理。

虽然 connect() 和 recv() 不再阻塞主程序,空出来的时间段 CPU 没有空闲着,但并没有利用好这空闲去做其他有意义的事情,而是在循环尝试读写 socket (不停判断非阻塞调用的状态是否就绪)。还得处理来自底层的可忽略的异常。也不能同时处理多个 socket 。

然后 10 次下载任务仍然按序进行。所以总体执行时间和同步阻塞相当。如果非得这样子,那还不如同步阻塞算了。

非阻塞改进

epoll

判断非阻塞调用是否就绪如果 OS 能做,是不是应用程序就可以不用自己去等待和判断了,就可以利用这个空闲去做其他事情以提高效率。

所以OS 将 I/O 状态的变化都封装成了事件,如可读事件、可写事件。并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。让应用程序可以通过select注册文件描述符和回调函数。当文件描述符的状态发生变化时,select 就调用事先注册的回调函数。

select因其算法效率比较低,后来改进成了poll,再后来又有进一步改进,BSD 内核改进成了kqueue模块,而 Linux 内核改进成了epoll模块。这四个模块的作用都相同,暴露给程序员使用的 API 也几乎一致,区别在于kqueueepoll 在处理大量文件描述符时效率更高。

鉴于 Linux 服务器的普遍性,以及为了追求更高效率,所以我们常常听闻被探讨的模块都是 epoll

回调(Callback)

把 I/O 事件的等待和监听任务交给了 OS,那 OS 在知道 I/O 状态发生改变后(例如 socket 连接已建立成功可发送数据),它又怎么知道接下来该干嘛呢?只能回调

需要我们将发送数据与读取数据封装成独立的函数,让epoll代替应用程序监听socket状态时,得告诉epoll:“如果socket状态变为可以往里写数据(连接建立成功了),请调用 HTTP 请求发送函数。如果socket 变为可以读数据了(客户端已收到响应),请调用响应处理函数。”

于是我们利用epoll结合回调机制重构爬虫代码:

pass

此处和前面稍有不同的是,我们将下载不同的 10 个页面,相对 URL 路径存放于 urls_todo 集合中。现在看看改进在哪。

首先,不断尝试send()recv() 的两个循环被消灭掉了。

其次,导入了selectors模块,并创建了一个DefaultSelector实例。Python 标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。DefaultSelector类会根据 OS 环境自动选择最佳的模块,那在 Linux 2.5.44 及更新的版本上都是epoll了。

然后,在第 25 行和第 31 行分别注册了socket可写事件(EVENT_WRITE)和可读事件(EVENT_READ)发生后应该采取的回调函数。

虽然代码结构清晰了,阻塞操作也交给 OS 去等待和通知了,但是,我们要抓取 10 个不同页面,就得创建 10 个Crawler实例,就有 20 个事件将要发生,那如何从selector里获取当前正发生的事件,并且得到对应的回调函数去执行呢?

事件循环(Event Loop)

为了解决上述问题,那我们只得采用老办法,写一个循环,去访问 selector 模块,等待它告诉我们当前是哪个事件发生了,应该对应哪个回调。这个等待事件通知的循环,称之为事件循环

pass

上述代码中,我们用stopped全局变量控制事件循环何时停止。当urls_todo消耗完毕后,会标记stoppedTrue

重要的是第 49 行代码,selector.select() 是一个阻塞调用,因为如果事件不发生,那应用程序就没事件可处理,所以就干脆阻塞在这里等待事件发生。那可以推断,如果只下载一篇网页,一定要connect()之后才能send()继而recv(),那它的效率和阻塞的方式是一样的。因为不在connect()/recv()上阻塞,也得在select()上阻塞。

所以,selector 机制(后文以此称呼代指 epoll/kqueue)是设计用来解决大量并发连接的。当系统中有大量非阻塞调用,能随时产生事件的时候,selector机制才能发挥最大的威力。

下面是如何启创建 10 个下载任务和启动事件循环的:

pass

注:总体耗时约 0.45 秒。

上述执行结果令人振奋。在单线程内用 事件循环+回调 搞定了 10 篇网页同时下载的问题。这,已经是异步编程了。虽然有一个 for 循环顺序地创建 Crawler 实例并调用fetch 方法,但是fetch 内仅有connect()和注册可写事件,而且从执行时间明显可以推断,多个下载任务确实在同时进行!

上述代码异步执行的过程:

  1. 创建Crawler 实例;
  2. 调用fetch方法,会创建socket连接和在selector上注册可写事件;
  3. fetch 内并无阻塞操作,该方法立即返回;
  4. 重复上述 3 个步骤,将 10 个不同的下载任务都加入事件循环;
  5. 启动事件循环,进入第 1 轮循环,阻塞在事件监听上;
  6. 当某个下载任务EVENT_WRITE被触发,回调其connected方法,第一轮事件循环结束;
  7. 进入第 2 轮事件循环,当某个下载任务有事件触发,执行其回调函数;此时已经不能推测是哪个事件发生,因为有可能是上次connected里的EVENT_READ先被触发,也可能是其他某个任务的EVENT_WRITE被触发;(此时,原来在一个下载任务上会阻塞的那段时间被利用起来执行另一个下载任务了
  8. 循环往复,直至所有下载任务被处理完成
  9. 退出事件循环,结束整个下载程序

3.5.4 总结

目前为止,我们已经从同步阻塞学习到了异步非阻塞。掌握了在单线程内同时并发执行多个网络 I/O 阻塞型任务的黑魔法。而且与多线程相比,连线程切换都没有了,执行回调函数是函数调用开销,在线程的栈内完成,因此性能也更好,单机支持的任务规模也变成了数万到数十万个。(不过我们知道:没有免费午餐,也没有银弹。)

部分编程语言中,对异步编程的支持就止步于此(不含语言官方之外的扩展)。需要程序猿直接使用 epoll 去注册事件和回调、维护一个事件循环,然后大多数时间都花在设计回调函数上。

通过本节的学习,我们应该认识到,不论什么编程语言,但凡要做异步编程,上述的“事件循环+回调”这种模式是逃不掉的,尽管它可能用的不是epoll,也可能不是while循环。如果你找到了一种不属于 “等会儿告诉你” 模型的异步方式,请立即给我打电话(注意,打电话是 Call)。

为什么我们在某些异步编程中并没有看到 CallBack 模式呢?这就是我们接下来要探讨的问题。本节是学习异步编程的一个终点,也是另一个起点。毕竟咱们讲 Python 异步编程,还没提到其主角协程的用武之地。