asyncio与aiohttp

python3.6 版本以后,asyncio 与 aiohttp 异步模块的使用说明

asyncio

文档在此 不懂就查

概述

在Python3.6后,可以通过关键词async def来定义一个coroutine协程,协程就相当于未来需要完成的任务,多个协程就是多个需要完成的任务,多个协程可以进一步封装到一个task对象中,task就是一个储存任务的盒子。此时,装在盒子里的任务并没有真正的运行,需要把它接入到一个监视器中使它运行,同时监视器还要持续不断的盯着盒子里的任务运行到了哪一步,这个持续不断的监视器就用一个循环对象loop来实现。 原话链接 单来说在一个线程里,先后执行 AB 两个任务,但是当A遇到耗时操作(网络等待、文件读写等),这个时候 gevent 会让 A 继续执行,但是同时也会开始执行B任务,如果B在遇到耗时操作同时A又执行完了耗时操作

基础示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

#定义第1个协程,协程就是将要具体完成的任务,该任务耗时3秒,完成后显示任务完成
async def to_do_something(i):
print('第{}个任务:任务启动...'.format(i))
#遇到耗时的操作,await就会使任务挂起,继续去完成下一个任务
await asyncio.sleep(i)
print('第{}个任务:任务完成!'.format(i))
#定义第2个协程,用于通知任务进行状态
async def mission_running():
print('任务正在执行...')

start = time.time()
#创建一个循环
loop = asyncio.get_event_loop()
#创建一个任务盒子tasks,包含了3个需要完成的任务
tasks = [asyncio.ensure_future(to_do_something(1)),
asyncio.ensure_future(to_do_something(2)),
asyncio.ensure_future(mission_running())]
#tasks接入loop中开始运行
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(end-start)

代码注释

  1. 启动入口 asyncio.run()
  2. 并发运行asyncio任务 asyncio.create_task()
  3. 并发运行asyncio任务 asyncio.gather()
  4. 等待对象 await
  5. 休眠 asyncio.sleep() 挂起当前任务,允许允许其他任务

核心解析

  • event_loop 事件循环 理解为一个循环的池,里面存放一些async关键词定义的协程函数,只有放到循环池里才能执行。
  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • future:代表将来执行或没有执行的任务的结果。它和task上没有本质的区别。
  • async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

简单使用

1
2
3
4
5
6
7
8
import asyncio

async def do_some_work(x): # 使用async关键字定义协程
print('Waiting: ', x)

coroutine = do_some_work(2) # 创建协程对象
loop = asyncio.get_event_loop() # 创建一个事件循环(池)
loop.run_until_complete(coroutine) # 将协程对象包装并注册协程对象

创建 task

协程对象不能直接运行,需要包装成任务才能运行,上面是通过run_until_complete()方法包装成task(隐式包装),还有下面两种方式进行显式包装:

  • task = asyncio.ensure_future(coroutine)
  • task = loop.create_task(coroutine)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time, asyncio

_now = lambda: time.time()

async def work(x): # 使用async关键字定义协程
print('Waiting: ', x)

cor = work('hi')
start = _now()
loop = asyncio.get_event_loop()
task = loop.create_task(cor) # 方法1
# task = asyncio.ensure_future(cor) # 方法2
loop.run_until_complete(task)
print(_now()-start)

创建 task 后

  1. task 在加入事件循环前十 pending 状态
  2. 加入 loop 后运行中是 running 状态
  3. loop 调用完是 Done 状态
  4. 运行完是 finished 状态

task 显性包装相比隐性包装有了协程函数的状态

loop.run_until_complete() 接收 future 参数,指协程函数 task是future的子类

绑定回调函数

通过 task 的 task.add_done_callback(callback) 绑定回调函数,接收一个 future 对象参数如 task,在内部通过 future.result() 获取其返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def work(x): # 使用async关键字定义协程
return x+3

def callback(y):
print(y.result())

cor = work(5)
loop = asyncio.get_event_loop()
task = loop.create_task(cor) # 方法1
# task = asyncio.ensure_future(cor) # 方法2
task.add_done_callback(callback) # 绑定回调函数
loop.run_until_complete(task) # 运行

await 挂起耗时操作

task 对象是顺序执行的, 因为在异步中没有声明哪些是耗时操作,所以会顺序执行,await作用就是提示哪些是耗时操作,可以对耗时操作进行挂起。

1
2
3
4
5
6
7
8
9
10
11
import asyncio
import time

async def test():
# time.sleep(1) # 传统顺序执行
await asyncio.sleep(1) # 开启异步执行
print(time.time())

tasks = [asyncio.ensure_future(test()) for _ in range(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

传统执行的情况下,会阻塞顺序执行。 开启await挂起耗时操作的情况下会异步执行

aiohttp 网络访问

文档在此 不懂就查

pip3 install aiohttp

使用案例,连续访问100次百度只需要一秒不到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import aiohttp, asyncio, time

async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(response)

async def request():
url = 'http://www.baidu.com'
resulit = await get(url)

_now = lambda : time.time()
start = _now()
tasks = [asyncio.ensure_future(request()) for _ in range(100)]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print(_now()-start)

并发访问

  • loop.run_until_complete(syncio.wait(tasks)) 来实现协程并发,传入 task列表
  • loop.run_until_complete(asyncio.gather(*tasks)) asyncio.gather 会将列表中不是 task 的 coro 预先封装为 future, 而 wait 则不会。

两种方法效果相同,但是 wait 核 gather 返回值不同

多进程配合

如果这也满足不了你,你可以开启多进程配合使用,asyncio、aiohttp需要配合aiomultiprocess库使用,版本要求至少3.6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
from aiohttp import request
from aiomultiprocess import Process

async def put(url, params):
async with request("PUT", url, params=params) as response:
pass

async def main():
p = Process(target=put, args=("https://jreese.sh", ))
await p

asyncio.run(main())
If you want to get results back from that coroutine, Worker makes that available:

import asyncio
from aiohttp import request
from aiomultiprocess import Worker

async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")

async def main():
p = Worker(target=get, args=("https://jreese.sh", ))
response = await p

asyncio.run(main())
If you want a managed pool of worker processes, then use Pool:

import asyncio
from aiohttp import request
from aiomultiprocess import Pool

async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")

async def main():
urls = ["https://jreese.sh", ...]
async with Pool() as pool:
result = await pool.map(get, urls)

asyncio.run(main())

关闭协程

  • 关闭单个 task
  • 关闭 loop

具体涉及函数

  • asyncio.Task.all_tasks() 获取事件循环任务列表
  • KeyboardInterrupt 捕获停止异常(Ctrl+C)
  • loop.stop() 停止任务循环
  • task.cancel() 取消单个任务
  • loop.run_forever()
  • loop.close() 关闭事件循环,不然会重启

同类型 gevent 模块

python程序实现的一种单线程下的多任务执行调度器,简单来说在一个线程里,先后执行 AB 两个任务,但是当A遇到耗时操作(网络等待、文件读写等),这个时候 gevent 会让 A 继续执行,但是同时也会开始执行B任务,如果B在遇到耗时操作同时A又执行完了耗时操作,gevent 又继续执行 A。 这里是我gevent使用Demo