异步编程
参考视频: https://www.bilibili.com/video/BV157mFYEEkH
python官方文档: https://docs.python.org/3.12/library/asyncio-task.html
前提
- 执行协程函数,必须使用事件循环
- 如果没有创建asyncio任务也可以执行协程函数,但是不是并发运行,通过
await
激发,但是这样的操作没有并发 - 协程中提到的
loop
是指任务循环 - 在任务循环中,实际上是依次执行各个任务,由于存在的GIL,python实际上不存在真正的多核并发,只是疯狂转换, 那么如果没有碰到
await
标记的可挂起步骤, 是会一直在CPU上机处理, 从而阻塞整个任务循环, 之所以print
这样的步骤不需要await
,是因为这样的操作速度足够快,不会阻塞整个任务循环,所以不需要挂起
意义
- 避免耗时IO阻塞耗费时间,可以同时执行多个任务,提升性能
协程的特点是可以暂停运行也可以继续运行
当出现关键词await
的时候就会让出控制权,暂停运行,直到后面东西完成了才回来执行后面的内容
将协程包装成任务
函数前面加上async
,需要暂停的地方进行改写,例如
1 | sleep(1) --> await asyncio.sleep(1) |
await的第二个作用就是将后面的协程包装成任务,让事件循环调度
定义完协程函数就要定义协程并且包装为任务
在main函数汇总调用协程函数得到协程,并且包装为任务
1 | task1 = async.create_task(coro(url)) |
然后使用await
来获取结果(需要注意的是,这里的await必须在协程函数中,不能在main函数中),也就是说:
✅ async def main():pass
❌ def main():pass
1 | result = await task1 |
建立事件循环asyncio.create_task()
函数,用于将协程作为 asyncio 任务并发运行。
需要注意的是,如果没有创建asyncio任务也可以执行,但是不是并发运行
1 | asyncio.run(main()) |
await
表明当前的协程要暂停运行,当协程中的await
被执行时,事件循环会暂停当前协程,并开始执行下一个协程,当执行完毕时,事件循环会恢复当前协程继续执行
如果await
后面是一个协程,就将协程包装为一个任务,
但是如果已经是一个任务了,就将任务加入到事件循环中,等待执行,等到任务执行完毕,await会获取这个协程执行的结果result = await task1
,所以result会接受这个结果
建立的分类:手动和自动
手动:
手动创建然后触发,获取结果,优点是可以控制,缺点是麻烦,需要自己管理
1 | task1 = async.create_task(coro(url)) |
参考代码:
1 | async def main(): |
自动:
有两个方法:asyncio.gather()
和asyncio.as_completed()
asyncio.gather()
等待所有任务完成返回
可以传入多个任务,并且等待所有任务执行完毕,然后返回一个列表(按照传入顺序给结果)1
2import asyncio
results = await asyncio.gather(fun1(),fun2())实例代码:
1
2
3
4
5
6
7
8
9
10
11
12async def main():
url = 'http://httpbin.org'
file_path = 'file.txt'
results = await asyncio.gather(read_file(file_path), fetch_url(url))
print(results)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main()) # 手动运行
end_time = time.time()
print("the time cost is ", end_time - start_time)asyncio.as_completed()
不会等待所有任务完成,而是返回一个迭代器,迭代器中包含的是任务,
当任务完成时,迭代器会返回这个任务,并且可以获取这个任务的结果,但是不能保证顺序
也就是说谁先完成谁先执行接下来的逻辑
需要注意的是,这个方法传入的是一个列表1
2
3results = asyncio.as_completed([fun1(),fun2()])
for result in results:
print(await result)示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14async def main():
url = 'http://httpbin.org'
file_path = 'file.txt'
results = asyncio.as_completed([ read_file(file_path),fetch_url(url)])
for result in results:
print(await result) # 返回的是一个对象!
print(results)
if __name__ == '__main__':
start_time = time.time()
asyncio.run(main()) # 手动运行
end_time = time.time()
print("the time cost is ", end_time - start_time)
异步http请求
AI实例源码:
aiohttp.ClientSession()
内部维护了一个tcp连接池,可以使用tcp复用
也可以手动管理,但是如果忘记session.close()
就有可能导致连接泄露,并且代码冗余
1 | session = aiohttp.ClientSession() # 手动创建会话 |
1 | import asyncio |
多线程实现异步asyncio.to_thread()
与多线程对比
需要注意的是,这个并不是真正意义上的多线程, 实际上仍是单核,处理CPU-bound的时候会相当吃力,如果是 CPU 密集型任务 → 用多进程(multiprocessing
)绕过 GIL。
协程是单线程的
asyncio
的协程运行在 单个线程 中,由 事件循环(loop
) 调度。- 并发原理:通过
await
挂起 I/O 等待,让事件循环切换到其他任务,实现 协作式并发。模拟CPU并发,耗时就下CPU一边凉快去,需要计算就上CPU计算 - 优势:
- 比多线程 更轻量(无线程切换开销)。
- 适合 高并发 I/O 操作(如网络请求、文件读写)。
对比多线程
asyncio (协程) |
多线程 | |
---|---|---|
并发模型 | 单线程 + 事件循环 | 多线程 + OS 调度 |
适用场景 | I/O 密集型(如 HTTP 请求) | CPU 密集型 或 混合型任务 |
性能开销 | 极低(无线程切换) | 高(线程切换、锁竞争) |
GIL 影响 | 无影响(单线程) | 受 GIL 限制(CPU 任务无法并行) |
设计目标不同
asyncio.to_thread() |
多线程(threading ) |
---|---|
主要用于 在协程中运行同步阻塞代码(如 time.sleep() 、requests.get() ),避免阻塞事件循环。 |
用于 真正的并行执行(受 GIL 限制,适合 I/O 密集型任务)。 |
底层仍然依赖线程池,但 由 asyncio 管理,对开发者透明。 |
开发者需要手动管理线程的生命周期、同步机制(如 Lock 、Queue )。 |
适用于 async/await 代码,与协程无缝集成。 |
适用于同步代码,与 asyncio 不直接兼容。 |
执行方式不同
asyncio.to_thread()
运行方式:
- 将同步函数 提交到线程池(默认使用
concurrent.futures.ThreadPoolExecutor
)。 - 线程池执行完成后,结果返回给事件循环,协程继续执行。
1
2
3
4
5
6
7
8
9
10
11
12import asyncio
import time
def blocking_task():
time.sleep(2) # 同步阻塞操作
return "Done"
async def main():
result = await asyncio.to_thread(blocking_task) # 在子线程中运行
print(result) # 输出 "Done"
asyncio.run(main())- 将同步函数 提交到线程池(默认使用
特点:
- 不会阻塞事件循环(因为
time.sleep(2)
在子线程中运行)。 - 适合偶尔的同步阻塞调用(如数据库查询、文件读写)。
- 不会阻塞事件循环(因为
多线程(threading
)
运行方式:
- 直接创建和管理线程,每个线程独立执行任务。
- 需要手动处理线程同步(如
Lock
、Event
)。
示例:
1
2
3
4
5
6
7
8
9
10import threading
import time
def blocking_task():
time.sleep(2)
print("Done")
thread = threading.Thread(target=blocking_task)
thread.start() # 启动线程
thread.join() # 等待线程结束特点:
- 真正的并行执行(受 GIL 限制,I/O 密集型任务可以并发)。
- 适合长期运行的并发任务(如 Web 服务器处理多个请求)。
性能对比
场景 | asyncio.to_thread() |
多线程(threading ) |
---|---|---|
I/O 密集型任务(如 HTTP 请求) | ✅ 高效(协程 + 线程池结合) | ✅ 高效(线程可以并发等待 I/O) |
CPU 密集型任务(如计算) | ❌ 受 GIL 限制(仍然单核) | ⚠️ 受 GIL 限制(多线程无法真正并行计算) |
适用场景 | 适合 asyncio 代码中 偶尔的同步阻塞调用 |
适合 纯同步代码的并发执行 |
适用场景
asyncio.to_thread()
更适合:
在
async/await
代码中 临时调用同步阻塞函数(如requests.get()
、time.sleep()
)。希望 保持
asyncio
的简洁性,避免手动管理线程。示例:
1
2
3async def fetch_data():
data = await asyncio.to_thread(requests.get, "https://example.com") # 不阻塞事件循环
return data.json()
多线程(threading
)更适合:
纯同步代码,需要并发执行多个任务。
长期运行的并发任务(如 Web 服务器、爬虫)。
示例:
1
2
3
4
5
6
7
8
9def worker():
while True:
task = queue.get() # 从队列取任务
process(task) # 处理任务
queue.task_done() # 标记完成
# 启动 4 个线程
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
5. 关键总结
对比维度 | asyncio.to_thread() |
多线程(threading ) |
---|---|---|
设计目标 | 在协程中运行同步阻塞代码 | 实现真正的并发(受 GIL 限制) |
阻塞问题 | 不会阻塞事件循环 | 可能阻塞主线程(需手动管理) |
适用场景 | async/await 代码中的临时同步调用 |
纯同步代码的并发执行 |
性能 | 适合少量阻塞调用 | 适合高并发 I/O 任务 |
复杂度 | 简单(自动管理线程池) | 较高(需处理线程同步) |
选择建议
- 如果已经在用
asyncio
→ 优先用asyncio.to_thread()
处理同步阻塞代码。 - 如果是纯同步代码 → 用多线程(
threading
)或concurrent.futures
。 - 如果是 CPU 密集型任务 → 用多进程(
multiprocessing
)绕过 GIL。
最终结论
asyncio.to_thread()
是 协程与线程的桥梁,让async/await
代码能兼容同步阻塞操作。- 多线程 仍然是 传统并发编程 的主要方式,适合非
asyncio
场景。 - 最佳实践:在
asyncio
生态中尽量用异步库(如aiohttp
),只有必要时才用to_thread
。
协程的其他操作
asyncio.to_thread(fun(1),1,2,3)
这个方法是将一个非协程函数转换成协程函数,并且传入参数,并且返回一个协程对象,这个对象可以await