协程
协程的本质是用户态线程,由程序自行控制切换时机,无需操作系统介入。与线程相比,协程的三大核心优势:
- 资源占用极低:一个协程仅需KB级内存,可轻松创建数万个
- 切换效率惊人:上下文切换在用户空间完成,效率高
- 无锁编程体验:单线程内串行执行,天然规避竞态条件
协程(Coroutine) 是Python实现单线程并发的核心机制,通过async/await语法实现非阻塞I/O操作,特别适合处理网络请求密集型任务(如Web服务器、爬虫等)
协程生态的四大核心
- 事件循环(Event Loop)
协程的调度中心,如同机场的塔台,负责监控和执行所有协程任务。 - 可等待对象(Awaitables)
包括协程、Task、Future三类对象,均可被await表达式挂起。 - Task
包装协程的执行单位,用于并发调度: - Future
底层的异步操作结果容器,通常由框架内部使用。
代码演示案例
python">#### 演示发执行的demo
import asyncio
import time
# 定义一个协程
async def coro(x):print(f"{time.time()} 协程 {x} run coro ...")await asyncio.sleep(x) # 阻塞协程print(f"{time.time()} 协程 {x} run coro end ...")return xif __name__ == "__main__":start_time = time.time()# 创建协程tasks = []# 创建多个协程,并转为task[tasks.append(asyncio.ensure_future(coro(i))) for i in range(1,5)]# 创建一个事件循环对象loop = asyncio.get_event_loop()# 将task注册到事件循环中执行loop.run_until_complete(asyncio.wait(tasks))for task in tasks:# 获取结果print(task.get_name(),task.result())print(f"所有协程任务执行完成共耗时:{time.time() - start_time}")
输入结果:
python">1740897721.200705 协程 1 run coro ...
1740897721.2007282 协程 2 run coro ...
1740897721.200736 协程 3 run coro ...
1740897721.2007449 协程 4 run coro ...
1740897722.202246 协程 1 run coro end ...
1740897723.202154 协程 2 run coro end ...
1740897724.2020962 协程 3 run coro end ...
1740897725.202158 协程 4 run coro end ...
Task-1 1
Task-2 2
Task-3 3
Task-4 4
所有协程任务执行完成共耗时:4.002019882202148进程已结束,退出代码为 0
根据控制台输出的内容。在主线程中创建的4个协程同时执行。并相继完成,而主线程也才共耗时4秒,是协程耗时更久的那个协程执行的时间。
以上是在执行玩成可以在主线程中获取到结果集,在主线程中对结果集可以进行处理。那如果我不想在主线程中获取,当一个协程完成后直接执行一个固定的逻辑。比如数据保存。要怎么实现呢?
其实在Task 对象中呢,有个add_done_callback方法。其作用就是在任务完成前将回调注册的方法。但是这个回调方法要注意,只能有个参数,这个参数就是future对象
下面将上面的demo 修改一下,不在主线程中获取,当协程完成后,去回调callback方法进行数据打印。
示例代码:
python">import asyncio
import time
# 定义一个协程
async def coro(x):print(f"{time.time()} 协程 {x} run coro ...")await asyncio.sleep(x) # 阻塞协程print(f"{time.time()} 协程 {x} run coro end ...")return xdef callback(fun):print(f"{time.time()} 协程 {fun.result()} 完成打印")if __name__ == "__main__":start_time = time.time()# 创建协程tasks = []# 创建多个协程,并转为task[tasks.append(asyncio.ensure_future(coro(i))) for i in range(1,5)][task.add_done_callback(callback) for task in tasks]# 创建一个事件循环对象loop = asyncio.get_event_loop()# 将task注册到事件循环中执行loop.run_until_complete(asyncio.wait(tasks))print(f"所有协程任务执行完成共耗时:{time.time() - start_time}")
控制台输出:
python">1740900137.698991 协程 1 run coro ...
1740900137.699011 协程 2 run coro ...
1740900137.699018 协程 3 run coro ...
1740900137.699022 协程 4 run coro ...
1740900138.700324 协程 1 run coro end ...
1740900138.700427 协程 1 完成打印
1740900139.699353 协程 2 run coro end ...
1740900139.69946 协程 2 完成打印
1740900140.698559 协程 3 run coro end ...
1740900140.698708 协程 3 完成打印
1740900141.6983051 协程 4 run coro end ...
1740900141.698412 协程 4 完成打印
所有协程任务执行完成共耗时:3.999724864959717进程已结束,退出代码为 0
这里可以看到,回调方法已经执行了。
代码优化
在python 3.7+ 提供了一个asyncio.run() 方法。如果是一个协程可以直接使用asyncio.run(协程对象) 进行执行。但是如果像上面那样。有多个协程该怎么优化呢。
既然asyncio.run() 只接受一个协程对象,那就创建一个另外的协程,这个协程里await 所有协程不就解决这个弊端了吗
优化代码:
python">#### 演示发执行的demo
import asyncio
import time
# 定义一个协程
async def coro(x):print(f"{time.time()} 协程 {x} run coro ...")await asyncio.sleep(x) # 阻塞协程print(f"{time.time()} 协程 {x} run coro end ...")return x## 主协程
async def main():# 创建多个协程,并转为tasktasks =[asyncio.ensure_future(coro(i)) for i in range(1,5)][task.add_done_callback(callback) for task in tasks]done, pending = await asyncio.wait(tasks)return [t.result() for t in done]## 回调函数
def callback(fun):print(f"{time.time()} 协程 {fun.result()} 完成打印")if __name__ == "__main__":start_time = time.time()asyncio.run(main())print(f"所有协程任务执行完成共耗时:{time.time() - start_time}")
避坑指南
下面是我在学习中容易犯的错误,在这里总结一下
- 事件循环陷阱
python"># 错误示例 ❌
# 这里是获取一个运行的loop,第一次创建肯定是没有的。
#源码中,如果没有运行的loop 就排除异常。
loop = asyncio.get_running_loop() # 正确修复 ✅
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
- 阻塞操作污染
python">sync def bad_case():time.sleep(5) # ❌ 同步阻塞代码破坏事件循环async def good_case():await asyncio.sleep(5) # ✅ 异步非阻塞
- 资源泄漏防范
python">async def db_operation():conn = await get_connection()try:# 数据库操作...finally:await conn.close() # 必须显式释放资源
注意:以上是我自己学习理解记录的笔记,如果有不对的地方,还请各位大佬指出。 谢谢!!