python基础入门:7.3并发编程初探

ops/2025/2/12 8:11:46/

Python并发编程全面解析:解锁程序性能的新维度

python"># 并发执行模板
import concurrent.futures
import timedef task(n):"""模拟耗时任务"""print(f"开始执行任务 {n}")time.sleep(2 if n % 2 == 0 else 1)return f"任务 {n} 完成"# 多线程执行
with concurrent.futures.ThreadPoolExecutor() as executor:results = [executor.submit(task, i) for i in range(5)]for f in concurrent.futures.as_completed(results):print(f.result())# 多进程执行
with concurrent.futures.ProcessPoolExecutor() as executor:results = executor.map(task, range(5))for res in results:print(res)
一、多线程编程实战
  1. 线程池基础用法
python">import threading
from queue import Queuedef worker(q):"""线程工作函数"""while True:item = q.get()if item is None:breakprint(f"处理 {item}")q.task_done()# 创建线程池
q = Queue()
threads = []
for i in range(3):t = threading.Thread(target=worker, args=(q,))t.start()threads.append(t)# 提交任务
for item in ['A', 'B', 'C', 'D', 'E']:q.put(item)# 等待完成
q.join()
for _ in range(3):q.put(None)
for t in threads:t.join()
  1. 线程安全与锁机制
python">class BankAccount:def __init__(self):self.balance = 1000self.lock = threading.Lock()def transfer(self, amount):with self.lock:new_balance = self.balance + amounttime.sleep(0.1)  # 模拟处理延迟self.balance = new_balancedef test_transfer(account):for _ in range(100):account.transfer(1)account = BankAccount()
threads = []for _ in range(10):t = threading.Thread(target=test_transfer, args=(account,))threads.append(t)t.start()for t in threads:t.join()print(f"最终余额: {account.balance}")  # 正确应为2000
二、多进程编程深入
  1. 进程间通信(IPC)
python">from multiprocessing import Process, Pipedef worker(conn):"""子进程处理函数"""while True:msg = conn.recv()if msg == 'exit':breakprint(f"处理消息: {msg}")conn.send(msg.upper())conn.close()# 创建管道
parent_conn, child_conn = Pipe()# 启动子进程
p = Process(target=worker, args=(child_conn,))
p.start()# 主进程通信
messages = ['hello', 'world', 'python', 'exit']
for msg in messages:parent_conn.send(msg)if msg != 'exit':print(f"收到回复: {parent_conn.recv()}")p.join()
  1. 共享内存与性能对比
python">import multiprocessingdef cpu_bound(n):"""计算密集型任务"""return sum(i*i for i in range(n))# 顺序执行
start = time.time()
[cpu_bound(10**6) for _ in range(4)]
print(f"顺序执行: {time.time()-start:.2f}s")# 多进程执行
start = time.time()
with multiprocessing.Pool() as pool:pool.map(cpu_bound, [10**6]*4)
print(f"多进程执行: {time.time()-start:.2f}s")
三、协程与异步编程
  1. asyncio基础架构
python">import asyncioasync def fetch_data(url):"""模拟异步请求"""print(f"开始请求 {url}")await asyncio.sleep(2)  # 模拟IO等待print(f"完成请求 {url}")return f"{url} 响应"async def main():tasks = [asyncio.create_task(fetch_data(f"https://api/data/{i}"))for i in range(5)]results = await asyncio.gather(*tasks)print("所有请求完成:", results)# 执行事件循环
asyncio.run(main())
  1. 生产级异步HTTP客户端
python">import aiohttp
import async_timeoutasync def async_fetch(session, url):"""带超时的异步请求"""try:async with async_timeout.timeout(5):async with session.get(url) as response:return await response.text()except asyncio.TimeoutError:print(f"请求超时: {url}")return Noneasync def batch_fetch(urls):"""批量异步请求"""async with aiohttp.ClientSession() as session:tasks = [async_fetch(session, url) for url in urls]return await asyncio.gather(*tasks)# 使用示例
urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/3','https://httpbin.org/delay/2'
]results = asyncio.run(batch_fetch(urls))
print("获取到", len([r for r in results if r]), "个有效响应")
四、并发模型对比与选择
  1. 并发方式对比矩阵
特性多线程多进程协程
执行模型抢占式切换独立内存空间协作式切换
最佳适用场景I/O密集型任务CPU密集型任务高并发I/O操作
内存占用共享内存,较低独立内存,较高极低
启动开销较小较大最小
数据共享通过共享内存需要IPC机制通过事件循环
Python实现限制受GIL限制无GIL限制需要Python 3.7+
  1. 性能测试对比(处理1000个HTTP请求)
方式耗时CPU使用率内存占用
同步顺序执行82.3s12%45MB
多线程(50线程)4.2s85%210MB
多进程(4进程)6.8s95%580MB
协程(500并发)2.1s78%65MB
混合型
问题类型
CPU密集型?
多进程
I/O密集型?
高并发?
协程
多线程
进程+线程/协程

最佳实践指南

  1. 优先使用concurrent.futures高层API
  2. 避免在多线程中执行CPU密集型任务
  3. 使用队列进行线程/进程间通信
  4. 协程中避免阻塞操作
  5. 设置合理的并发数量(线程/进程数)
  6. 使用threading.local管理线程局部存储
  7. 多进程编程时使用Manager共享状态
  8. 异步编程注意异常处理
  9. 使用性能分析工具(cProfile, line_profiler)
  10. 考虑使用第三方库(celery, ray)
python"># 生产环境配置示例
MAX_WORKERS = min(32, (os.cpu_count() or 1) + 4)  # 线程池公式def safe_execute(func):"""带异常处理的执行装饰器"""@functools.wraps(func)def wrapper(*args, **kwargs):try:return func(*args, **kwargs)except Exception as e:logging.error(f"执行失败: {str(e)}")raisereturn wrapper@safe_execute
def critical_task():"""关键任务函数"""# 业务逻辑...

http://www.ppmy.cn/ops/157727.html

相关文章

云原生周刊:DeepSeek 颠覆人工智能

开源项目推荐 Ollama Ollama 是一个开源的 AI 工具,旨在为用户提供简单而强大的本地部署语言模型解决方案。它支持直接在本地计算机上运行多个预训练的语言模型,能够提供与云端类似的体验,但无需依赖外部服务器或网络连接。 Ollama 的主要…

Unity-Mirror网络框架-从入门到精通之MultipleMatches示例

文章目录 前言MultipleMatchesLobbyViewRoomViewMatchGUIPlayerGUI总结前言 在现代游戏开发中,网络功能日益成为提升游戏体验的关键组成部分。本系列文章将为读者提供对Mirror网络框架的深入了解,涵盖从基础到高级的多个主题。Mirror是一个用于Unity的开源网络框架,专为多人…

【MySQL — 数据库基础】深入解析MySQL的聚合查询

1. 聚合查询 1.1 聚合函数 函数说明COUNT ( [DISTINCT] expr)返回查询到的数据的数量( 行数 )SUM ( [DISTINCT] expr)返回查询到的数据的总和,不是数字没有意义AVG ( [DISTINCT] expr)返回查询到的数据的平均值,不是数字没有意义MAX( [DISTINCT] expr)…

安卓开发,底部导航栏

1、创建导航栏图标 使用系统自带的矢量图库文件,鼠标右键点击res->New->Vector Asset 修改 Name , Clip art 和 Color 再创建一个 同样的方法再创建四个按钮 2、添加百分比布局依赖 app\build.gradle.kts 中添加百分比布局依赖,并点击Sync Now …

hi3516cv610用海思arm-v01c02-linux-musleabi-strip工具,对库进行瘦身

hi3516cv610用海思arm-v01c02-linux-musleabi-strip工具,对库进行瘦身 执行 arm-v01c02-linux-musleabi-strip libcrypto.so.1.1 应用该工具,对程序裁剪很实用

如何利用DeepSeek开源模型打造OA系统专属AI助手

利用DeepSeek开源模型打造OA系统专属AI助手,可以显著提升办公效率,增强信息检索和管理能力。 注册与登录DeepSeek平台 访问DeepSeek官网 访问DeepSeek的官方网站DeepSeek。使用电子邮件或手机号码注册账号并登录。 获取API Key 登录DeepSeek平台&am…

Task02:Ollama 自定义导入模型

2月13日-14日Task02:Ollama 自定义导入模型[Ollama自定义导入模型](https://datawhalechina.github.io/handy-ollama/#/C3/1. 自定义导入模型)截止时间 02月15日03:00 [Ollama 自定义导入模型](https://datawhalechina.github.io/handy-ollama/#/C3/1. 自定义导入模…

Vue项目能进行哪些性能优化?

一、前言 在新公司开发项目的时候,我发现总结了一个问题就是,当我开发完某个需求。但是当测试完开发完产品验收的时候可能会从性能方面验收的时候会出现一些你意想不到的结果。为此我整理一下项目中优化可以从哪些方面来入手。首先可以从三个方面来进行…