使用FastApi模拟网络IO接口
import timefrom fastapi import FastAPIapp = FastAPI()# 文件名 api.py
# 运行: uvicorn api:app --reload@app.get("/sleep/{times}")
def sleep(times: int):# 模拟接口耗时time.sleep(times)return {"sleep": times}
asyncio协程
协程: 使用 async def
语法定义的函数总是为协程函数,即使它们不包含 await
或 async
关键字。
引自: 8. 复合语句 — Python 3.12.3 文档
对比 并发运行 的效率
# _*_ coding : UTF-8 _*_
# @Time : 2024/4/22 下午7:12
# @Auther : Tiam
# @File : 异步
# @Project : play-python
# @Desc :import asyncio
import threading
import timeimport aiohttpdef get_run_time(func):"""获取 async函数运行时间:param func::return:"""async def wrapper(*args, **kwargs):start_time = time.time()await func(*args, **kwargs)end_time = time.time()print(f"函数 {func.__name__} 运行时间: {end_time - start_time} 秒")return wrapperasync def req(second):print(threading.current_thread().name, second)url = f'http://127.0.0.1:8000/sleep/{second}'async with aiohttp.ClientSession() as session:async with session.get(url) as response:print("Status:", response.status)return url, response.statuscounts = 5@get_run_time
async def main():for i in range(counts):await req(i)# [await req(i) for i in range(counts)] # 等同于以上操作@get_run_time
async def main_gather():# https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.gather # 新版本推荐使用 asyncio.TaskGroupresult = await asyncio.gather(*[req(i) for i in range(counts)])print(result)@get_run_time
async def main_task_group():# https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.TaskGroupasync with asyncio.TaskGroup() as tg:# 重点: create_task 会将 协程(用async修饰的函数) 转换成 一个可并行调度的任务task-object# https://docs.python.org/zh-cn/3/library/asyncio-task.html#task-objecttasks = [tg.create_task(req(i)) for i in range(counts)]for task in tasks:print(task.result())if __name__ == '__main__':# 同步顺序执行, 耗时 0+1+2+3+4 = 10# asyncio.run(main()) # 函数 main 运行时间: 10.017327308654785 秒# 并发运行, 只耗时最长的一个IO# asyncio.run(main_gather()) # 函数 main_gather 运行时间: 4.008605718612671 秒# 3.11 +asyncio.run(main_task_group()) # 函数 main_task_group 运行时间: 4.16048264503479 秒
Process进程/Thread线程
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorimport requestsdef get_run_time(func):def wrapper(*args, **kwargs):start_time = time.time()func(*args, **kwargs)end_time = time.time()print(f"函数 {func.__name__} 运行时间: {end_time - start_time} 秒")return wrapperdef req():with requests.session() as session:with session.get('http://localhost:8000/sleep/1') as response:print("Status:", response.status_code)counts = 5@get_run_time
def main1():with ThreadPoolExecutor(max_workers=counts) as executor:for i in range(counts):executor.submit(req)@get_run_time
def main2():req()@get_run_time
def main3():with ProcessPoolExecutor(max_workers=counts) as executor:for i in range(counts):executor.submit(req)if __name__ == '__main__':main1() # 函数 main1 运行时间: 3.0591320991516113 秒main2() # 函数 main1 运行时间: 3.0591320991516113 秒main3() # 函数 main3 运行时间: 4.668190956115723 秒, 多进程增加了 进程切换开销 时间
总结:
多进程和多线程是两种常见的并发执行方式,它们各自有不同的特点和适用场景:
多进程(Multi-Process)
优点:
- 隔离性:每个进程都有独立的内存空间和系统资源,一个进程的崩溃不会直接影响到其他进程。
- 资源利用:可以更好地利用多核CPU,通过创建多个进程来执行CPU密集型任务。
- 简化编程:编程相对容易,通常不需要考虑锁和同步资源的问题。
- 容错性:进程间相互独立,具有更强的容错性。
缺点:
- 资源消耗:进程的创建和销毁需要较多的系统资源和时间。
- 通信开销:进程间通信(IPC)机制比线程间通信复杂,可能会影响性能。
- 上下文切换:进程间的上下文切换开销较大。
多线程(Multi-Threading)
优点:
- 资源共享:同一进程下的线程共享进程的内存和资源,数据共享和通信更容易。
- 执行开销:线程的创建和销毁比进程更快,资源消耗较小。
- 上下文切换:线程间的上下文切换比进程间快,因为它们共享相同的地址空间。
- 响应性:适合需要快速响应的应用程序,如用户界面程序。
缺点:
- 同步问题:线程之间需要同步和互斥机制来避免竞态条件和数据冲突。
- GIL限制:在某些语言(如Python)中,全局解释器锁(GIL)限制了线程的并行执行。
- 稳定性风险:一个线程的不稳定可能影响整个进程的稳定性。
选择多进程还是多线程?
- CPU密集型任务:如果任务主要是计算密集型的,并且需要充分利用多核CPU,多进程可能是更好的选择。
- IO密集型任务:对于IO密集型任务,多线程可能更合适,因为线程可以在等待IO操作时被操作系统挂起,让出CPU给其他线程使用。
- 并发数要求:需要处理大量并发请求时,多线程可以更高效地利用资源。
- 安全性和稳定性:如果程序需要高安全性和稳定性,多进程提供的隔离性可能更合适。
- 开发和维护难度:如果程序逻辑较为简单,或者开发者对并发编程不够熟悉,多进程可能更容易开发和维护。
在实际应用中,两种模型也可以结合使用,例如,可以使用多进程模型来处理多个并行的任务,而每个进程内部使用多线程来进一步提高并发度。
关系:
每个进程可以有多个线程, 每个线程下又可以存在多个协程