本文大多从Python代码阐释协程,如果你还不知道什么是协程,建议先阅读https://www.zhihu.com/question/294188439/answer/555273313;
本文也算作观看Fear and Awaiting in Async: A Savage Journey to the Heart of the Coroutine Dream的笔记。
从经典的网络socket编程开始,传统的单线程回显服务器同时只能处理单一连接:
from socket import *
def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
sock.bind(address)
sock.listen(1)
while True:
client, addr = sock.accept()
echo_handler(client, addr)
def echo_handler(client, addr):
print('Connection from ', addr)
with client:
while True:
data = client.recv(100000)
if not data:
break
client.sendall(data)
print('Connection closed')
if __name__ == '__main__':
echo_server(('', 25000))
这显然不符合我们的要求,此时如果有轻量级的需求,线程不失为一种好办法:
from threading import Thread
def threaded_echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
sock.bind(address)
sock.listen(1)
while True:
client, addr = sock.accept()
Thread(target=echo_handler, args=(client, addr)).start()
但是由于CPython GIL锁的存在,使用线程无法利用到处理器的多核并行,因此使用进程成为了另一种选择,尽管他在单核心上下文切换时会带来额外开销:
from multiprocessing import Process
def process_echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
sock.bind(address)
sock.listen(1)
while True:
client, addr = sock.accept()
Process(target=echo_handler, args=(client, addr)).start()
如果你愿意尝试新的编程思想,基于回调的方法不失为一种好的解决方案。Twisted就是用Python实现的基于事件驱动的网络引擎框架,通过设置线程池的方式支持多核心利用;我们可以通过继承protocol.Protocol并覆写dataReceived方法实现回调操作:
from twisted.internet import reactor, protocol
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
def callback_main(port):
factory = protocol.ServerFactory()
factory.protocol = Echo
reactor.listenTCP(port, factory)
reactor.run()
当然,如果不愿意使用twisted还是可以使用python原生库asyncio.Protocol实现类似操作;
import asyncio
class AsyncEcho(asyncio.Protocol):
def connection_made(self, transport) -> None:
self.transport = transport
def connection_lost(self, exc) -> None:
self.transport = None
def data_received(self, data: bytes) -> None:
self.transport.write(data)
def async_main(port):
loop = asyncio.get_event_loop()
coro = loop.create_server(AsyncEcho, '', port)
srv = loop.run_until_complete(coro)
loop.run_forever()
实际上,我们可以在原生支持单线程的echo_server中加入Python3.5引入的async/await关键字,以为其引入协程特性;
import asyncio, socket
async def handle_client(client):
while True:
request = (await loop.sock_recv(client, 255)).decode('utf8')
response = str(request)
await loop.sock_sendall(client, response.encode('utf8'))
async def run_server():
while True:
client, _ = await loop.sock_accept(server)
loop.create_task(handle_client(client))
if __name__ == '__main__':
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', 15555))
server.listen(8)
server.setblocking(False)
loop = asyncio.get_event_loop()
loop.run_until_complete(run_server())
- 为什么要用协程?
与线程(进程)相比,用户在用协程实现并发时不用考虑多线程(进程)中的临界资源控制(因为协程本质上还是单线程),也不用担心多线程(进程)上下文切换时造成的资源开销过大;
- 为什么协程的切换会更高效?
不管是进程还是线程,每次阻塞、切换操作系统都会陷入内核态完成系统调用,调度由CPU完成并确定下一个执行的进程(线程),而协程对于CPU来时实质上是单线程,所有的调度策略理论上都可由用户指定;
系统介绍async/await
在Python 3.5 之前,我们已经深入了解了普通函数与生成器函数:
def function():
return 1
def generator():
yield 1
在def前加上修饰词async可以将两者分别变为异步函数(协程)与异步生成器,分别对应types.CoroutineType和types.AsyncGeneratorType;
async def async_function():
return 1
async def async_generator():
yield 1
直接执行异步函数会返回coroutine对象,调用其send方法可以给协程发送参数值,且与生成器类似,第一次send的参数只能为None(后面会提到,async相当于Future+Generator):
>>> coro = async_function()
>>> coro.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: 1
然而直接调用send方法会触发StopIteration,这是协程正常退出的机制,我们可以通过try-except捕获错误并返回错误对象的value值:
def run(coro):
try:
coro.send(None)
except StopIteration as e:
return e.value
协程函数可以通过await挂起自身,并等待await后的协程完成后返回结果:
从这个例子我们可以看出,异步函数可以通过await调用其他异步函数;
async def async_function():
return 1
async def await_coroutine():
result = await async_function() # await后的实例类必须实现__await__方法;
print(result)
if __name__ == '__main__':
run(await_coroutine())
对函数而言,async可以出现在几乎任何的def之前,包括普通函数、生成器函数、类的类方法、静态方法与实例方法,将其标注为异步函数/生成器,例外情况有:
普通类的部分魔法方法不能用async修饰,如
__init__
,__del__
:class TestMagic(object): async def __init__(self): pass async def __del__(self): pass if __name__ == '__main__': a = TestMagic() # TypeError: __init__() should return None, not 'coroutine'
这部分很好理解,上述魔法方法与Python对实例的管理息息相关,Python要求这些函数不能返回任何对象;
(不建议)仍有部分魔法方法支持async,它们大多与Getter有关(__getitem__,__getattr__):
class TestMagic(object):
def __init__(self):
self.prop = 1
@property
async def value(self):
return self.prop
async def __getitem__(self, item):
print('getitem: ', item)
async def __getattr__(self, item):
print('getattr: ', item)
async def main():
a = TestMagic()
print(await a['hello'])
print(await a.value)
if __name__ == '__main__':
run(main())
然而十分不建议这种编程习惯,特别对@property而言,得到一个值却返回其coroutine对象从语义上就有些反直觉;
async还有许多恶心的玩法,比如说可以通过下面的例子实现异步初始化对象:
class AsyncInit(type):
async def __call__(cls, *args, **kwargs):
self = cls.__new__(cls, *args, **kwargs)
await self.__init__(*args, **kwargs)
return self
class TestMagic(metaclass=AsyncInit):
async def __init__(self):
print('I got inited!')
async def main():
a = await TestMagic()
if __name__ == '__main__':
run(main())
await可以出现在几乎任何异步函数调用前,除以下情况:
- await不能在IPython等交互式环境中直接调用异步函数;
>>> async def test():
... return 1
>>> await test()
File "<stdin>", line 1
SyntaxError: 'await' outside function
>>> def run(coro):
... try:
... coro.send(None)
... except StopIteration as e:
... return e.value
...
>>> run(test())
1
- await不能出现在非async函数下,这也意味着无法在lambda下await;
(Python3.6前)await不能用于列表、字典生成式,PEP-530为Python3.6+加上了这一特性;
>>> async def test_list(): ... return [await test() for _ in range(10)] ... >>> run(test_list()) [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] >>>
学习到这里,我们可能会发现await只是一种更冗余的函数调用,除此之外无他;实则不然,通过asyncio库我们可以实现同时执行多个协程,他们由event_loop对象统一管理、调用:
import asyncio
import time
async def do_homework():
print('I am doing homework...')
await asyncio.sleep(2)
# time.sleep(2)
print('Homework finished...')
async def play():
print('I am playing...')
await asyncio.sleep(1.2)
# time.sleep(1.2)
print('Playing finished...')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([do_homework(), play()]))
loop.close()
通过以上代码我们可以发现,loop对象能够同时启动两段协程,实现了多线程的效果;
请不要使用time.sleep()模拟协程的工作,time.sleep()会阻塞整个线程;
除函数外的async
async for
for在同步Python编程中经常被使用,下面给出async for与普通for的对照代码:
class Countdown(object):
def __init__(self, n = 10):
self.n = n
def __iter__(self):
print('called __iter__')
return self
def __next__(self):
if self.n < 1:
raise StopIteration
print('called __next__')
self.n -= 1
return self.n + 1
class ACountDown(object):
def __init__(self, n = 10):
self.n = n
async def __anext__(self):
if self.n < 1:
raise StopAsyncIteration # 区别1:停止迭代需要抛出的异常不同
print('called __anext__')
self.n -= 1
return self.n + 1
def __aiter__(self):
print('called __aiter__')
return self
async def main():
aCounter = ACountDown(10)
async for ac in aCounter: # 区别2:异步迭代也需要在异步函数中完成
print(ac)
if __name__ == '__main__':
counter = Countdown(10)
for c in counter:
print(c)
run(main())
async with
with是Python提供的上下文管理器机制的核心,普通with通过类的__enter__
,__exit__
方法实现上下文管理;如果你认真理解了async for,你不难猜到通过async with你需要实现哪几个方法:
async def coro_job():
print('coro job')
class Manager(object):
def __enter__(self):
print('Entering...')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print('Exiting...')
class AManager(object):
async def __aenter__(self):
print('Entering...')
await coro_job()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print('Exiting...')
await coro_job()
async def main():
m = AManager()
async with m:
print('I am in!')
if __name__ == '__main__':
m = Manager()
with m:
print('I am in!')
run(main())
有用的玩法
判断目前是否正被协程调用
在event_loop中,由于多个协程共享线程,如果某个协程中出现长时间阻塞将导致整个event_loop陷入停滞(如上面的time.sleep),因此如果提前预知某些函数有许多阻塞操作,我们可以使它们无法在协程中被调用。
import sys
def from_coro(n) -> bool:
return bool(sys._getframe(n).f_code.co_flags & 0x80) # frame hack来找到call stack中上一层函数的状态
Usage:
async def main():
def normal():
print(from_coro(1))
print(from_coro(1))
normal()
if __name__ == '__main__':
run(main())
#结果:True
# False
由本方法衍生来的装饰器可以实现只允许函数被正常调用:
from functools import wraps
def sync_only(func):
@wraps(func)
def wrapper(*args, **kwargs):
if from_coro(2):
raise RuntimeError("Sorry, this function can't be called from a async def...")
return func(*args, **kwargs)
return wrapper
@sync_only
def target():
print('I am target!')
async def main():
target()
if __name__ == '__main__':
run(main())
# RuntimeError: Sorry, this function can't be called from a async def...
同样地,你可以通过装饰器,使得在异步语境下调用的为函数A,同步语境下调用的为函数B;(略)