Python23_asyncio并发

张开发
2026/4/16 16:55:44 15 分钟阅读

分享文章

Python23_asyncio并发
Python23_asyncio并发文章目录Python23_asyncio并发[toc]第一章核心概念1.1 什么是 asyncio1.2 核心术语对照表第二章基础语法与API2.1 定义和运行协程2.2 并发执行多个任务2.3 超时与取消第三章常见问题与解答FAQQ1: async 和 await 到底是什么Q2: 为什么我的协程没有并发执行Q3: asyncio.run() 是什么能调用多次吗Q4: 如何在协程中调用同步阻塞代码Q5: asyncio.gather() vs asyncio.wait() 区别Q6: 如何正确处理异常Q7: 什么是事件循环如何获取Q8: 如何实现生产者-消费者模式Q9: async for 和异步迭代器是什么Q10: 如何调试 asyncio 程序第四章最佳实践✅ 应该做的❌ 不应该做的第五章速查表常用 API 速查并发模式选择第六章学习路径建议第一章核心概念1.1 什么是 asyncioasyncio是 Python 3.4 引入的标准库用于编写并发代码使用async/await语法。importasyncioasyncdefmain():print(Hello)awaitasyncio.sleep(1)print(World)asyncio.run(main())关键特性单线程并发利用事件循环Event Loop实现非阻塞 I/O协程Coroutine使用async def定义的函数非抢占式协程主动让出控制权await而非被强制切换1.2 核心术语对照表术语说明类比Coroutine协程使用async def定义的函数特殊的生成器函数Task被事件循环调度的协程封装线程中的线程对象Event Loop事件循环调度执行协程心脏驱动整个系统Future异步操作最终结果占位符期货/期票Awaitable可被await的对象可等待的第二章基础语法与API2.1 定义和运行协程importasyncio# 定义协程asyncdefsay_hello(name):print(fHello,{name}!)awaitasyncio.sleep(1)# 模拟异步操作print(fGoodbye,{name}!)# 运行协程的三种方式# 方式1asyncio.run() - 推荐Python 3.7asyncio.run(say_hello(Alice))# 方式2在已有事件循环中运行asyncdefmain():awaitsay_hello(Bob)asyncio.run(main())# 方式3创建Task并发执行asyncdefmain():task1asyncio.create_task(say_hello(Alice))task2asyncio.create_task(say_hello(Bob))awaittask1awaittask2 asyncio.run(main())2.2 并发执行多个任务importasyncioasyncdeffetch_data(url,delay):print(f开始获取{url})awaitasyncio.sleep(delay)print(f完成获取{url})returnfData from{url}asyncdefmain():# 方式1asyncio.gather - 等待所有完成resultsawaitasyncio.gather(fetch_data(url1,2),fetch_data(url2,1),fetch_data(url3,3))print(f所有结果:{results})# 方式2asyncio.wait - 更灵活的控制tasks[asyncio.create_task(fetch_data(url1,2)),asyncio.create_task(fetch_data(url2,1)),]done,pendingawaitasyncio.wait(tasks,timeout3)print(f已完成:{len(done)}, 待处理:{len(pending)})# 方式3asyncio.as_completed - 按完成顺序处理tasks[fetch_data(furl{i},i)foriinrange(1,4)]forcoroinasyncio.as_completed(tasks):resultawaitcoroprint(f先完成的:{result})asyncio.run(main())2.3 超时与取消importasyncioasyncdeflong_running_task():try:awaitasyncio.sleep(10)returnCompletedexceptasyncio.CancelledError:print(任务被取消)raise# 必须重新抛出asyncdefmain():# 设置超时try:resultawaitasyncio.wait_for(long_running_task(),timeout2.0)exceptasyncio.TimeoutError:print(操作超时)# 手动取消任务taskasyncio.create_task(long_running_task())awaitasyncio.sleep(1)task.cancel()try:awaittaskexceptasyncio.CancelledError:print(成功取消任务)asyncio.run(main())第三章常见问题与解答FAQQ1:async和await到底是什么A:async def定义一个协程函数调用它返回协程对象不会立即执行await暂停当前协程等待另一个协程/任务完成期间事件循环可以运行其他任务asyncdeffoo():return42# 调用协程函数得到的是协程对象不是结果corofoo()# 这里不会执行 printprint(type(coro))# class coroutine# 必须 await 才能获取结果resultawaitfoo()# 42Q2: 为什么我的协程没有并发执行A:常见错误是顺序await没有创建 Task# ❌ 错误顺序执行总耗时 3 秒asyncdefbad():awaitasyncio.sleep(1)# 等待1秒awaitasyncio.sleep(1)# 再等待1秒awaitasyncio.sleep(1)# 再等待1秒# ✅ 正确并发执行总耗时 1 秒asyncdefgood():awaitasyncio.gather(asyncio.sleep(1),asyncio.sleep(1),asyncio.sleep(1))# ✅ 或者使用 create_taskasyncdefgood2():task1asyncio.create_task(asyncio.sleep(1))task2asyncio.create_task(asyncio.sleep(1))awaittask1awaittask2Q3:asyncio.run()是什么能调用多次吗A:asyncio.run(coro)是 Python 3.7 的简便入口函数它会创建新的事件循环运行协程关闭循环不能嵌套调用也不能在已有事件循环中调用# ❌ 错误嵌套调用asyncdefmain():asyncio.run(other_coro())# RuntimeError!# ✅ 正确在 main 中使用 awaitasyncdefmain():awaitother_coro()asyncio.run(main())Q4: 如何在协程中调用同步阻塞代码A:使用asyncio.to_thread()(Python 3.9) 或loop.run_in_executor()importasyncioimporttimedefblocking_io():模拟阻塞操作如文件读写、CPU计算time.sleep(2)# 阻塞returnDoneasyncdefmain():# 方式1Python 3.9resultawaitasyncio.to_thread(blocking_io)# 方式2通用方法loopasyncio.get_running_loop()resultawaitloop.run_in_executor(None,blocking_io)print(result)asyncio.run(main())Q5:asyncio.gather()vsasyncio.wait()区别特性gather()wait()返回值直接返回所有结果列表返回(done_set, pending_set)异常处理默认第一个异常就返回可配置return_when使用场景简单并发需要所有结果需要超时控制、部分结果# gather: 简单获取所有结果resultsawaitasyncio.gather(task1,task2,task3)# wait: 更灵活的控制done,pendingawaitasyncio.wait([task1,task2,task3],timeout5.0,return_whenasyncio.FIRST_COMPLETED# 任意一个完成就返回)Q6: 如何正确处理异常importasyncioasyncdefmay_fail():raiseValueError(出错了)asyncdefmain():# 方式1try-excepttry:awaitmay_fail()exceptValueErrorase:print(f捕获错误:{e})# 方式2gather 的异常处理resultsawaitasyncio.gather(may_fail(),asyncio.sleep(1),return_exceptionsTrue# 将异常作为结果返回不抛出)# results [ValueError(出错了), None]# 方式3任务异常处理taskasyncio.create_task(may_fail())try:awaittaskexceptValueError:print(任务失败)print(f异常详情:{task.exception()})asyncio.run(main())Q7: 什么是事件循环如何获取importasyncioasyncdefmain():# 获取当前运行的事件循环loopasyncio.get_running_loop()# 获取/设置事件循环低级API一般不直接使用# loop asyncio.get_event_loop() # 可能已废弃# asyncio.set_event_loop(loop)print(f当前循环:{loop})asyncio.run(main())注意现代 asyncio 推荐使用高层 APIasyncio.run(),asyncio.create_task()避免直接操作事件循环。Q8: 如何实现生产者-消费者模式importasyncioasyncdefproducer(queue:asyncio.Queue,n:int):foriinrange(n):awaitasyncio.sleep(0.5)# 模拟生产时间awaitqueue.put(i)print(f生产:{i})awaitqueue.put(None)# 发送结束信号asyncdefconsumer(queue:asyncio.Queue,name:str):whileTrue:itemawaitqueue.get()ifitemisNone:# 结束信号queue.task_done()breakawaitasyncio.sleep(1)# 模拟消费时间print(f消费者{name}处理:{item})queue.task_done()asyncdefmain():queueasyncio.Queue(maxsize5)# 限制队列大小producers[asyncio.create_task(producer(queue,5))]consumers[asyncio.create_task(consumer(queue,fC{i}))foriinrange(2)]awaitasyncio.gather(*producers)awaitqueue.join()# 等待所有项目被处理awaitasyncio.gather(*consumers)asyncio.run(main())Q9:async for和异步迭代器是什么importasyncioclassAsyncRange:异步迭代器示例def__init__(self,n):self.nn self.i0def__aiter__(self):returnselfasyncdef__anext__(self):ifself.iself.n:raiseStopAsyncIterationawaitasyncio.sleep(0.1)# 模拟异步操作valself.i self.i1returnvalasyncdefmain():# 使用 async forasyncforiinAsyncRange(5):print(i)# 异步推导式result[iasyncforiinAsyncRange(5)ifi%20]print(result)# [0, 2, 4]asyncio.run(main())Q10: 如何调试 asyncio 程序importasyncioimportlogging# 启用调试模式logging.basicConfig(levellogging.DEBUG)asyncdefmain():# 检测阻塞设置慢回调警告阈值loopasyncio.get_running_loop()loop.slow_callback_duration0.1# 超过100ms警告# 或者环境变量PYTHONASYNCIODEBUG1awaitasyncio.sleep(1)# 运行方式PYTHONASYNCIODEBUG1 python script.pyasyncio.run(main(),debugTrue)调试技巧使用asyncio.run(debugTrue)设置环境变量PYTHONASYNCIODEBUG1使用aiomonitor库进行运行时监控使用pytest-asyncio进行异步测试第四章最佳实践✅ 应该做的# 1. 始终使用 asyncio.run() 作为入口asyncdefmain():...if__name____main__:asyncio.run(main())# 2. 及时创建 Task 实现并发taskasyncio.create_task(coro())# 立即调度awaittask# 等待完成# 3. 使用 timeout 防止无限等待awaitasyncio.wait_for(coro(),timeout5.0)# 4. 使用 contextlib.asynccontextmanager 管理资源fromcontextlibimportasynccontextmanagerasynccontextmanagerasyncdefmanaged_resource():awaitconnect()try:yieldresourcefinally:awaitdisconnect()# 5. 使用 asyncio.gather 收集结果resultsawaitasyncio.gather(*tasks,return_exceptionsTrue)❌ 不应该做的# 1. 不要在协程中使用 time.sleep()awaitasyncio.sleep(1)# ✅time.sleep(1)# ❌ 阻塞整个事件循环# 2. 不要直接调用协程函数而不 awaitcoromy_async_func()# 只是创建协程对象不会执行# 3. 不要忽略 CancelledErrortry:awaittaskexceptasyncio.CancelledError:raise# ✅ 必须重新抛出或正确处理# 4. 不要创建过多并发任务内存爆炸# 使用 asyncio.Semaphore 限制并发数semaphoreasyncio.Semaphore(10)asyncdeflimited_task():asyncwithsemaphore:awaitactual_work()第五章速查表常用 API 速查操作代码运行协程asyncio.run(main())创建任务asyncio.create_task(coro())等待多个await asyncio.gather(*coros)超时控制await asyncio.wait_for(coro, timeout)睡眠await asyncio.sleep(delay)当前时间asyncio.get_event_loop().time()队列asyncio.Queue(maxsize)锁asyncio.Lock()信号量asyncio.Semaphore(n)事件asyncio.Event()条件变量asyncio.Condition()并发模式选择场景推荐方案多个独立任务asyncio.gather()需要超时/部分结果asyncio.wait()按完成顺序处理asyncio.as_completed()生产者-消费者asyncio.Queue限制并发数asyncio.Semaphore共享资源保护asyncio.Lock第六章学习路径建议第一阶段基础概念 └── 理解协程、事件循环、Task 的关系 └── 掌握 async/await 语法 第二阶段并发控制 └── gather/wait/as_completed 的使用 └── 超时和取消机制 第三阶段同步互斥 └── Queue、Lock、Semaphore、Event 第四阶段实战整合 └── 异步 HTTP (aiohttp) └── 异步数据库 (aiomysql/aiopg) └── 异步 Web 框架 (FastAPI/Sanic)提示asyncio 学习曲线较陡建议从简单示例开始逐步理解事件循环驱动协程切换的核心机制。遇到问题时首先检查是否有阻塞操作阻塞了事件循环。

更多文章