banner
Hi my new friend!

python_异步编程笔记

Scroll down

异步编程

参考视频: https://www.bilibili.com/video/BV157mFYEEkH
python官方文档: https://docs.python.org/3.12/library/asyncio-task.html

前提

  1. 执行协程函数,必须使用事件循环
  2. 如果没有创建asyncio任务也可以执行协程函数,但是不是并发运行,通过await激发,但是这样的操作没有并发
  3. 协程中提到的loop是指任务循环
  4. 在任务循环中,实际上是依次执行各个任务,由于存在的GIL,python实际上不存在真正的多核并发,只是疯狂转换, 那么如果没有碰到await标记的可挂起步骤, 是会一直在CPU上机处理, 从而阻塞整个任务循环, 之所以print这样的步骤不需要await,是因为这样的操作速度足够快,不会阻塞整个任务循环,所以不需要挂起

意义

  1. 避免耗时IO阻塞耗费时间,可以同时执行多个任务,提升性能

协程的特点是可以暂停运行也可以继续运行
当出现关键词await的时候就会让出控制权,暂停运行,直到后面东西完成了才回来执行后面的内容

将协程包装成任务
函数前面加上async,需要暂停的地方进行改写,例如

1
sleep(1)  --> await asyncio.sleep(1)

await的第二个作用就是将后面的协程包装成任务,让事件循环调度
定义完协程函数就要定义协程并且包装为任务
在main函数汇总调用协程函数得到协程,并且包装为任务

1
task1 =  async.create_task(coro(url))

然后使用await来获取结果(需要注意的是,这里的await必须在协程函数中,不能在main函数中),也就是说:
async def main():pass
def main():pass

1
result = await task1

建立事件循环
asyncio.create_task() 函数,用于将协程作为 asyncio 任务并发运行。
需要注意的是,如果没有创建asyncio任务也可以执行,但是不是并发运行

1
asyncio.run(main())

await表明当前的协程要暂停运行,当协程中的await被执行时,事件循环会暂停当前协程,并开始执行下一个协程,当执行完毕时,事件循环会恢复当前协程继续执行
如果await后面是一个协程,就将协程包装为一个任务,
但是如果已经是一个任务了,就将任务加入到事件循环中,等待执行,等到任务执行完毕,await会获取这个协程执行的结果result = await task1,所以result会接受这个结果

建立的分类:手动和自动

手动:

手动创建然后触发,获取结果,优点是可以控制,缺点是麻烦,需要自己管理

1
2
task1 =  async.create_task(coro(url))
result = await task1

参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def main():
url = 'http://httpbin.org'
file_path = 'file.txt'

task1 = asyncio.create_task(fetch_url(url))
task2 = asyncio.create_task(read_file(file_path))

fetch_result = await task1
fetch_file = await task2


if __name__ == '__main__':
start_time = time.time()
asyncio.run(main()) # 手动运行

end_time = time.time()
print("the time cost is ", end_time - start_time)

自动:

有两个方法:
asyncio.gather()asyncio.as_completed()

  1. asyncio.gather() 等待所有任务完成返回
    可以传入多个任务,并且等待所有任务执行完毕,然后返回一个列表(按照传入顺序给结果)

    1
    2
    import asyncio    
    results = await asyncio.gather(fun1(),fun2())

    实例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    async def main():
    url = 'http://httpbin.org'
    file_path = 'file.txt'
    results = await asyncio.gather(read_file(file_path), fetch_url(url))
    print(results)

    if __name__ == '__main__':
    start_time = time.time()
    asyncio.run(main()) # 手动运行

    end_time = time.time()
    print("the time cost is ", end_time - start_time)
  2. asyncio.as_completed()不会等待所有任务完成,而是返回一个迭代器,迭代器中包含的是任务,
    当任务完成时,迭代器会返回这个任务,并且可以获取这个任务的结果,但是不能保证顺序
    也就是说谁先完成谁先执行接下来的逻辑
    需要注意的是,这个方法传入的是一个列表

    1
    2
    3
    results = asyncio.as_completed([fun1(),fun2()])
    for result in results:
    print(await result)

    示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    async def main():
    url = 'http://httpbin.org'
    file_path = 'file.txt'
    results = asyncio.as_completed([ read_file(file_path),fetch_url(url)])
    for result in results:
    print(await result) # 返回的是一个对象!
    print(results)

    if __name__ == '__main__':
    start_time = time.time()
    asyncio.run(main()) # 手动运行

    end_time = time.time()
    print("the time cost is ", end_time - start_time)

异步http请求

AI实例源码:

aiohttp.ClientSession()内部维护了一个tcp连接池,可以使用tcp复用

也可以手动管理,但是如果忘记session.close()就有可能导致连接泄露,并且代码冗余

1
2
3
4
5
6
session = aiohttp.ClientSession()  # 手动创建会话
try:
response = await session.get("https://example.com")
data = await response.text()
finally:
await session.close() # 必须手动关闭!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import requests # ❌ 同步库
import aiohttp # ✅ 异步库

async def bad_coroutine():
print("Start sync request")
requests.get("https://httpbin.org/delay/2") # ❌ 阻塞 2 秒
print("Sync request done")

async def good_coroutine():
print("Start async request")
async with aiohttp.ClientSession() as session:
await session.get("https://httpbin.org/delay/2") # ✅ 非阻塞
print("Async request done")

async def main():
await asyncio.gather(bad_coroutine(), good_coroutine())

asyncio.run(main())

多线程实现异步asyncio.to_thread()与多线程对比

需要注意的是,这个并不是真正意义上的多线程, 实际上仍是单核,处理CPU-bound的时候会相当吃力,如果是 CPU 密集型任务 → 用多进程(multiprocessing)绕过 GIL。

协程是单线程的

  • asyncio 的协程运行在 单个线程 中,由 事件循环(loop 调度。
  • 并发原理:通过 await 挂起 I/O 等待,让事件循环切换到其他任务,实现 协作式并发。模拟CPU并发,耗时就下CPU一边凉快去,需要计算就上CPU计算
  • 优势:
    • 比多线程 更轻量(无线程切换开销)。
    • 适合 高并发 I/O 操作(如网络请求、文件读写)。

对比多线程

asyncio(协程) 多线程
并发模型 单线程 + 事件循环 多线程 + OS 调度
适用场景 I/O 密集型(如 HTTP 请求) CPU 密集型 或 混合型任务
性能开销 极低(无线程切换) 高(线程切换、锁竞争)
GIL 影响 无影响(单线程) 受 GIL 限制(CPU 任务无法并行)

设计目标不同

asyncio.to_thread() 多线程(threading
主要用于 在协程中运行同步阻塞代码(如 time.sleep()requests.get()),避免阻塞事件循环。 用于 真正的并行执行(受 GIL 限制,适合 I/O 密集型任务)。
底层仍然依赖线程池,但 asyncio 管理,对开发者透明。 开发者需要手动管理线程的生命周期、同步机制(如 LockQueue)。
适用于 async/await 代码,与协程无缝集成。 适用于同步代码,与 asyncio 不直接兼容。

执行方式不同

asyncio.to_thread()

  • 运行方式:

    • 将同步函数 提交到线程池(默认使用 concurrent.futures.ThreadPoolExecutor)。
    • 线程池执行完成后,结果返回给事件循环,协程继续执行。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import asyncio
    import time

    def blocking_task():
    time.sleep(2) # 同步阻塞操作
    return "Done"

    async def main():
    result = await asyncio.to_thread(blocking_task) # 在子线程中运行
    print(result) # 输出 "Done"

    asyncio.run(main())
  • 特点:

    • 不会阻塞事件循环(因为 time.sleep(2) 在子线程中运行)。
    • 适合偶尔的同步阻塞调用(如数据库查询、文件读写)。

多线程(threading

  • 运行方式:

    • 直接创建和管理线程,每个线程独立执行任务。
    • 需要手动处理线程同步(如 LockEvent)。
  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import threading
    import time

    def blocking_task():
    time.sleep(2)
    print("Done")

    thread = threading.Thread(target=blocking_task)
    thread.start() # 启动线程
    thread.join() # 等待线程结束
  • 特点:

    • 真正的并行执行(受 GIL 限制,I/O 密集型任务可以并发)。
    • 适合长期运行的并发任务(如 Web 服务器处理多个请求)。

性能对比

场景 asyncio.to_thread() 多线程(threading
I/O 密集型任务(如 HTTP 请求) ✅ 高效(协程 + 线程池结合) ✅ 高效(线程可以并发等待 I/O)
CPU 密集型任务(如计算) ❌ 受 GIL 限制(仍然单核) ⚠️ 受 GIL 限制(多线程无法真正并行计算)
适用场景 适合 asyncio 代码中 偶尔的同步阻塞调用 适合 纯同步代码的并发执行

适用场景

asyncio.to_thread() 更适合:

  • async/await 代码中 临时调用同步阻塞函数(如 requests.get()time.sleep())。

  • 希望 保持 asyncio 的简洁性,避免手动管理线程。

  • 示例:

    1
    2
    3
    async def fetch_data():
    data = await asyncio.to_thread(requests.get, "https://example.com") # 不阻塞事件循环
    return data.json()

多线程(threading)更适合:

  • 纯同步代码,需要并发执行多个任务。

  • 长期运行的并发任务(如 Web 服务器、爬虫)。

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def worker():
    while True:
    task = queue.get() # 从队列取任务
    process(task) # 处理任务
    queue.task_done() # 标记完成

    # 启动 4 个线程
    for _ in range(4):
    threading.Thread(target=worker, daemon=True).start()

5. 关键总结

对比维度 asyncio.to_thread() 多线程(threading
设计目标 在协程中运行同步阻塞代码 实现真正的并发(受 GIL 限制)
阻塞问题 不会阻塞事件循环 可能阻塞主线程(需手动管理)
适用场景 async/await 代码中的临时同步调用 纯同步代码的并发执行
性能 适合少量阻塞调用 适合高并发 I/O 任务
复杂度 简单(自动管理线程池) 较高(需处理线程同步)

选择建议

  1. 如果已经在用 asyncio → 优先用 asyncio.to_thread() 处理同步阻塞代码。
  2. 如果是纯同步代码 → 用多线程(threading)或 concurrent.futures
  3. 如果是 CPU 密集型任务 → 用多进程(multiprocessing)绕过 GIL。

最终结论

  • asyncio.to_thread()协程与线程的桥梁,让 async/await 代码能兼容同步阻塞操作。
  • 多线程 仍然是 传统并发编程 的主要方式,适合非 asyncio 场景。
  • 最佳实践:在 asyncio 生态中尽量用异步库(如 aiohttp),只有必要时才用 to_thread

协程的其他操作

  1. asyncio.to_thread(fun(1),1,2,3)这个方法是将一个非协程函数转换成协程函数,并且传入参数,并且返回一个协程对象,这个对象可以await
其他文章
cover
Hello World
  • 25/05/02
  • 21:31
  • 未分类
cover
ev3_basic_wireshark解析器
  • 25/04/30
  • 00:00
  • 打靶日记
目录导航 置顶
  1. 1. 异步编程
    1. 1.1. 前提
    2. 1.2. 意义
    3. 1.3. 建立的分类:手动和自动
      1. 1.3.1. 手动:
      2. 1.3.2. 自动:
    4. 1.4. 异步http请求
    5. 1.5. 多线程实现异步asyncio.to_thread()与多线程对比
      1. 1.5.1. 协程是单线程的
      2. 1.5.2. 对比多线程
      3. 1.5.3. 设计目标不同
      4. 1.5.4. 执行方式不同
        1. 1.5.4.1. asyncio.to_thread()
        2. 1.5.4.2. 多线程(threading)
      5. 1.5.5. 性能对比
      6. 1.5.6. 适用场景
        1. 1.5.6.1. asyncio.to_thread() 更适合:
        2. 1.5.6.2. 多线程(threading)更适合:
      7. 1.5.7. 5. 关键总结
        1. 1.5.7.1. 选择建议
        2. 1.5.7.2. 最终结论
    6. 1.6. 协程的其他操作
请输入关键词进行搜索