关于我
一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。
Github:https://github.com/hylinux1024
微信公众号:终身开发者(angrycode)

在前一篇《一文彻底搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念》 的文中,知道生成器( Generator )可由以下两种方式定义:

  • 列表生成器
  • 使用 yield 定义的函数

Python 早期的版本中协程也是通过生成器来实现的,也就是基于生成器的协程(Generator-based Coroutines)。在前一篇介绍生成器的文章末尾举了一个生产者-消费者的例子,就是基于生成器的协程来实现的。

            
              def producer(c):
    n = 0
    while n < 5:
        n += 1
        print('producer {}'.format(n))
        r = c.send(n)
        print('consumer return {}'.format(r))

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('consumer {} '.format(n))
        r = 'ok'

if __name__ == '__main__':
    c = consumer()
    next(c)  # 启动consumer
    producer(c)
            
          

看了这段代码,相信很多初学者和我一样对基于生成器的协程实现其实很难马上就能够根据业务写出自己的协程代码。 Python 实现者们也注意到这个问题,因为它太不 Pythonic 了。而基于生成器的协程也将被废弃,因此本文将重点介绍 asyncio 包的使用,以及涉及到的一些相关类概念。
注:我使用的 Python 环境是3.7。

0x00 何为协程(Coroutine)

协程( Coroutine )是在线程中执行的,可理解为微线程,但协程的切换没有上下文的消耗,它比线程更加轻量些。一个协程可以随时中断自己让另一个协程开始执行,也可以从中断处恢复并继续执行,它们之间的调度是由程序员来控制的(可以看本文开篇处生产者-消费者的代码)。

定义一个协程

Python 3.5+版本新增了 aysnc await 关键字,这两个语法糖让我们非常方便地定义和使用协程。
在函数定义时用 async 声明就定义了一个协程。

            
              import asyncio

# 定义了一个简单的协程
async def simple_async():
    print('hello')
    await asyncio.sleep(1) # 休眠1秒
    print('python')

# 使用asynio中run方法运行一个协程
asyncio.run(simple_async())

# 执行结果为
# hello
# python
            
          

在协程中如果要调用另一个协程就使用 await 要注意 await 关键字要在 async 定义的函数中使用,而反过来 async 函数可以不出现 await

            
              # 定义了一个简单的协程
async def simple_async():
    print('hello')

asyncio.run(simple_async())

# 执行结果
# hello
            
          

asyncio.run() 将运行传入的协程,负责管理 asyncio 事件循环。
除了 run() 方法可直接执行协程外,还可以使用事件循环 loop

            
              async def do_something(index):
    print(f'start {time.strftime("%X")}', index)
    await asyncio.sleep(1)
    print(f'finished at {time.strftime("%X")}', index)

def test_do_something():
    # 生成器产生多个协程对象
    task = [do_something(i) for i in range(5)]

    # 获取一个事件循环对象
    loop = asyncio.get_event_loop()
    # 在事件循环中执行task列表
    loop.run_until_complete(asyncio.wait(task))
    loop.close()

test_do_something()

# 运行结果
# start 00:04:03 3
# start 00:04:03 4
# start 00:04:03 1
# start 00:04:03 2
# start 00:04:03 0
# finished at 00:04:04 3
# finished at 00:04:04 4
# finished at 00:04:04 1
# finished at 00:04:04 2
# finished at 00:04:04 0
            
          

可以看出几乎同时启动了所有的协程。
其实翻阅源码可知 asyncio.run() 的实现也是封装了 loop 对象及其调用。而 asyncio.run() 每次都会创建一个新的事件循环对象用于执行协程。

0x01 Awaitable对象

Python 中可等待( Awaitable )对象有:协程( corountine )、任务( Task )、 Future 。即这些对象可以使用 await 关键字进行调用

            
              await awaitable_object
            
          
1. 协程(Coroutine)

协程由 async def 声明定义,一个协程可由另一个协程使用 await 进行调用

            
              async def nested():
    print('in nested func')
    return 13

async def outer():

    # 要使用await 关键字 才会执行一个协程函数返回的协程对象
    print(await nested())

asyncio.run(outer())

# 执行结果
# in nested func
# 13
            
          

如果在 outer() 方法中直接调用 nested() 而不使用 await ,将抛出一个 RuntimeWarning

            
              async def outer():
    # 直接调用协程函数不会发生执行,只是返回一个 coroutine 对象
    nested()

asyncio.run(outer())
            
          

运行程序,控制台将输出以下信息

            
              RuntimeWarning: coroutine 'nested' was never awaited
  nested()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
            
          
2. 任务(Task)

任务( Task )是可以用来 并发地 执行协程。可以使用 asyncio.create_task() 将一个协程对象封装成任务,该任务将很快被排入调度队列并执行。

            
              async def nested():
    print('in nested func')
    return 13

async def create_task():
    # create_task 将一个协程对象打包成一个 任务时,该协程就会被自动调度运行
    task = asyncio.create_task(nested())
    # 如果要看到task的执行结果
    # 可以使用await等待协程执行完成,并返回结果
    ret = await task
    print(f'nested return {ret}')

asyncio.run(create_task())

# 运行结果
# in nested func
# nested return 13
            
          

注:关于并发下文还会详细说明。

3. Future

Future 是一种特殊的低层级( low-level )对象,它是异步操作的最终结果( eventual result )。
当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

通常在应用层代码不会直接创建 Future 对象。在某些库和 asyncio 模块中的会使用到该对象。

            
              async def used_future_func():
    await function_that_returns_a_future_object()
            
          

0x02 并发

1. Task

前面我们知道 Task 可以并发地执行。 asyncio.create_task() 就是一个把协程封装成 Task 的方法。

            
              async def do_after(what, delay):
    await asyncio.sleep(delay)
    print(what)

# 利用asyncio.create_task创建并行任务
async def corun():
    task1 = asyncio.create_task(do_after('hello', 1)) # 模拟执行1秒的任务
    task2 = asyncio.create_task(do_after('python', 2)) # 模拟执行2秒的任务

    print(f'started at {time.strftime("%X")}')
    # 等待两个任务都完成,两个任务是并行的,所以总时间两个任务中最大的执行时间
    await task1
    await task2

    print(f'finished at {time.strftime("%X")}')

asyncio.run(corun())

# 运行结果
# started at 23:41:08
# hello
# python
# finished at 23:41:10
            
          

task1 是一个执行1秒的任务, task2 是一个执行2秒的任务,两个任务并发的执行,总共消耗2秒。

2. gather

除了使用 asyncio.create_task() 外还可以使用 asyncio.gather() ,这个方法接收协程参数列表

            
              async def do_after(what, delay):
    await asyncio.sleep(delay)
    print(what)

async def gather():
    print(f'started at {time.strftime("%X")}')
    # 使用gather可将多个协程传入
    await asyncio.gather(
        do_after('hello', 1),
        do_after('python', 2),
    )
    print(f'finished at {time.strftime("%X")}')

asyncio.run(gather())

# 运行结果
# started at 23:47:50
# hello
# python
# finished at 23:47:52
            
          

两个任务消耗的时间为其中消耗时间最长的任务。

0x03 引用

  1. https://docs.python.org/3/library/asyncio-task.html