本文大多从Python代码阐释协程,如果你还不知道什么是协程,建议先阅读https://www.zhihu.com/question/294188439/answer/555273313

本文也算作观看Fear and Awaiting in Async: A Savage Journey to the Heart of the Coroutine Dream的笔记。

从经典的网络socket编程开始,传统的单线程回显服务器同时只能处理单一连接:

from socket import *

def echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
    sock.bind(address)
    sock.listen(1)
    while True:
        client, addr = sock.accept()
        echo_handler(client, addr)
        
def echo_handler(client, addr):
    print('Connection from ', addr)
    with client:
        while True:
            data = client.recv(100000)
            if not data:
                break
            client.sendall(data)
    print('Connection closed')


if __name__ == '__main__':
    echo_server(('', 25000))

这显然不符合我们的要求,此时如果有轻量级的需求,线程不失为一种好办法:

from threading import Thread

def threaded_echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
    sock.bind(address)
    sock.listen(1)
    while True:
        client, addr = sock.accept()
        Thread(target=echo_handler, args=(client, addr)).start()

但是由于CPython GIL锁的存在,使用线程无法利用到处理器的多核并行,因此使用进程成为了另一种选择,尽管他在单核心上下文切换时会带来额外开销:

from multiprocessing import Process

def process_echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
    sock.bind(address)
    sock.listen(1)
    while True:
        client, addr = sock.accept()
        Process(target=echo_handler, args=(client, addr)).start()

如果你愿意尝试新的编程思想,基于回调的方法不失为一种好的解决方案。Twisted就是用Python实现的基于事件驱动的网络引擎框架,通过设置线程池的方式支持多核心利用;我们可以通过继承protocol.Protocol并覆写dataReceived方法实现回调操作:

from twisted.internet import reactor, protocol


class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)


def callback_main(port):
    factory = protocol.ServerFactory()
    factory.protocol = Echo
    reactor.listenTCP(port, factory)
    reactor.run()

当然,如果不愿意使用twisted还是可以使用python原生库asyncio.Protocol实现类似操作;

import asyncio


class AsyncEcho(asyncio.Protocol):
    def connection_made(self, transport) -> None:
        self.transport = transport

    def connection_lost(self, exc) -> None:
        self.transport = None

    def data_received(self, data: bytes) -> None:
        self.transport.write(data)


def async_main(port):
    loop = asyncio.get_event_loop()
    coro = loop.create_server(AsyncEcho, '', port)
    srv = loop.run_until_complete(coro)
    loop.run_forever()

实际上,我们可以在原生支持单线程的echo_server中加入Python3.5引入的async/await关键字,以为其引入协程特性;

import asyncio, socket

async def handle_client(client):
    while True:
        request = (await loop.sock_recv(client, 255)).decode('utf8')
        response = str(request)
        await loop.sock_sendall(client, response.encode('utf8'))


async def run_server():
    while True:
        client, _ = await loop.sock_accept(server)
        loop.create_task(handle_client(client))
if __name__ == '__main__':
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('localhost', 15555))
    server.listen(8)
    server.setblocking(False)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_server())
  • 为什么要用协程?

与线程(进程)相比,用户在用协程实现并发时不用考虑多线程(进程)中的临界资源控制(因为协程本质上还是单线程),也不用担心多线程(进程)上下文切换时造成的资源开销过大;

  • 为什么协程的切换会更高效?

不管是进程还是线程,每次阻塞、切换操作系统都会陷入内核态完成系统调用,调度由CPU完成并确定下一个执行的进程(线程),而协程对于CPU来时实质上是单线程,所有的调度策略理论上都可由用户指定;

系统介绍async/await

在Python 3.5 之前,我们已经深入了解了普通函数与生成器函数:

def function():
  return 1
def generator():
  yield 1

在def前加上修饰词async可以将两者分别变为异步函数(协程)与异步生成器,分别对应types.CoroutineType和types.AsyncGeneratorType;

async def async_function():
  return 1
async def async_generator():
  yield 1

直接执行异步函数会返回coroutine对象,调用其send方法可以给协程发送参数值,且与生成器类似,第一次send的参数只能为None(后面会提到,async相当于Future+Generator):

>>> coro = async_function()
>>> coro.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 1

然而直接调用send方法会触发StopIteration,这是协程正常退出的机制,我们可以通过try-except捕获错误并返回错误对象的value值:

def run(coro):
    try:
        coro.send(None)
    except StopIteration as e:
        return e.value

协程函数可以通过await挂起自身,并等待await后的协程完成后返回结果:

从这个例子我们可以看出,异步函数可以通过await调用其他异步函数;

async def async_function():
    return 1


async def await_coroutine():
    result = await async_function() # await后的实例类必须实现__await__方法;
    print(result)


if __name__ == '__main__':
    run(await_coroutine())

对函数而言,async可以出现在几乎任何的def之前,包括普通函数、生成器函数、类的类方法、静态方法与实例方法,将其标注为异步函数/生成器,例外情况有:

  • 普通类的部分魔法方法不能用async修饰,如__init__, __del__

    class TestMagic(object):
        async def __init__(self):
            pass
    
        async def __del__(self):
            pass
    
    if __name__ == '__main__':
        a = TestMagic()
    # TypeError: __init__() should return None, not 'coroutine'

这部分很好理解,上述魔法方法与Python对实例的管理息息相关,Python要求这些函数不能返回任何对象;

(不建议)仍有部分魔法方法支持async,它们大多与Getter有关(__getitem__,__getattr__):

class TestMagic(object):
    def __init__(self):
        self.prop = 1

    @property
    async def value(self):
        return self.prop

    async def __getitem__(self, item):
        print('getitem: ', item)

    async def __getattr__(self, item):
        print('getattr: ', item)

async def main():
    a = TestMagic()
    print(await a['hello'])
    print(await a.value)

if __name__ == '__main__':
    run(main())

然而十分不建议这种编程习惯,特别对@property而言,得到一个值却返回其coroutine对象从语义上就有些反直觉;

async还有许多恶心的玩法,比如说可以通过下面的例子实现异步初始化对象:

class AsyncInit(type):
    async def __call__(cls, *args, **kwargs):
        self = cls.__new__(cls, *args, **kwargs)
        await self.__init__(*args, **kwargs)
        return self

class TestMagic(metaclass=AsyncInit):
    async def __init__(self):
        print('I got inited!')

async def main():
    a = await TestMagic()

if __name__ == '__main__':
    run(main())

await可以出现在几乎任何异步函数调用前,除以下情况:

  • await不能在IPython等交互式环境中直接调用异步函数;
>>> async def test():
...     return 1
>>> await test()
  File "<stdin>", line 1
SyntaxError: 'await' outside function
>>> def run(coro):
...     try:
...         coro.send(None)
...     except StopIteration as e:
...         return e.value
...
>>> run(test())
1
  • await不能出现在非async函数下,这也意味着无法在lambda下await;
  • (Python3.6前)await不能用于列表、字典生成式,PEP-530为Python3.6+加上了这一特性;

    >>> async def test_list():
    ...     return [await test() for _ in range(10)]
    ...
    >>> run(test_list())
    [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    >>>

学习到这里,我们可能会发现await只是一种更冗余的函数调用,除此之外无他;实则不然,通过asyncio库我们可以实现同时执行多个协程,他们由event_loop对象统一管理、调用:

import asyncio
import time


async def do_homework():
    print('I am doing homework...')
    await asyncio.sleep(2)
    # time.sleep(2)
    print('Homework finished...')


async def play():
    print('I am playing...')
    await asyncio.sleep(1.2)
    # time.sleep(1.2)
    print('Playing finished...')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([do_homework(), play()]))
    loop.close()

通过以上代码我们可以发现,loop对象能够同时启动两段协程,实现了多线程的效果;

请不要使用time.sleep()模拟协程的工作,time.sleep()会阻塞整个线程;

除函数外的async

async for

for在同步Python编程中经常被使用,下面给出async for与普通for的对照代码:

class Countdown(object):
    def __init__(self, n = 10):
        self.n = n

    def __iter__(self):
        print('called __iter__')
        return self

    def __next__(self):
        if self.n < 1:
            raise StopIteration
        print('called __next__')
        self.n -= 1
        return self.n + 1


class ACountDown(object):
    def __init__(self, n = 10):
        self.n = n

    async def __anext__(self):
        if self.n < 1:
            raise StopAsyncIteration # 区别1:停止迭代需要抛出的异常不同
        print('called __anext__')
        self.n -= 1
        return self.n + 1

    def __aiter__(self):
        print('called __aiter__')
        return self



async def main():
    aCounter = ACountDown(10)
    async for ac in aCounter:  #  区别2:异步迭代也需要在异步函数中完成
        print(ac)

if __name__ == '__main__':
    counter = Countdown(10)
    for c in counter:
        print(c)
    run(main())

async with

with是Python提供的上下文管理器机制的核心,普通with通过类的__enter____exit__方法实现上下文管理;如果你认真理解了async for,你不难猜到通过async with你需要实现哪几个方法:

async def coro_job():
    print('coro job')

class Manager(object):
    def __enter__(self):
        print('Entering...')
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('Exiting...')

class AManager(object):
    async def __aenter__(self):
        print('Entering...')
        await coro_job()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print('Exiting...')
        await coro_job()
        
async def main():
    m = AManager()
    async with m:
        print('I am in!')

if __name__ == '__main__':
    m = Manager()
    with m:
        print('I am in!')
    run(main())

有用的玩法

判断目前是否正被协程调用

在event_loop中,由于多个协程共享线程,如果某个协程中出现长时间阻塞将导致整个event_loop陷入停滞(如上面的time.sleep),因此如果提前预知某些函数有许多阻塞操作,我们可以使它们无法在协程中被调用。

import sys
def from_coro(n) -> bool:
      return bool(sys._getframe(n).f_code.co_flags & 0x80)  # frame hack来找到call stack中上一层函数的状态

Usage:

async def main():
    def normal():
        print(from_coro(1))
    print(from_coro(1))
    normal()

if __name__ == '__main__':
    run(main())
    
#结果:True
#     False

由本方法衍生来的装饰器可以实现只允许函数被正常调用:

from functools import wraps

def sync_only(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if from_coro(2):
            raise RuntimeError("Sorry, this function can't be called from a async def...")
        return func(*args, **kwargs)
    return wrapper

@sync_only
def target():
    print('I am target!')
    
async def main():
    target()

if __name__ == '__main__':
    run(main())
    
    # RuntimeError: Sorry, this function can't be called from a async def...

同样地,你可以通过装饰器,使得在异步语境下调用的为函数A,同步语境下调用的为函数B;(略)

Last modification:September 17th, 2020 at 10:23 pm
If you think my article is useful to you, please feel free to appreciate