Python 多进程解析:Multiprocessing 高效并行处理的奥秘

devtools/2024/9/29 14:01:47/

Python 多进程解析:Multiprocessing 高效并行处理的奥秘

文章目录

  • Python 多进程解析:Multiprocessing 高效并行处理的奥秘
      • 一 多进程
        • 1 导入进程标准模块
        • 2 定义调用函数
        • 3 创建和启动进程
      • 二 存储进程结果 Queue
      • 三 threading & multiprocessing 对比
        • 1 创建多进程 multiprocessing
        • 2 创建多线程 multithread
        • 3 创建普通函数
        • 4 创建对比时间函数
        • 5 运行结果
      • 四 进程池 Pool
        • 1 进程池 Pool() 和 map()
        • 2 自定义核数量
        • 3 apply_async 单结果返回
        • 4 apply_async 多结果返回
        • 5 划重点
      • 五 共享内存 shared memory
      • 六 进程锁 Lock
        • 1 不加进程锁
        • 2 加进程锁
      • 七 完整代码示例
      • 八 源码地址

在 Python 编程中,多进程(Multiprocessing)是一种提高程序执行效率的重要手段。本文深入解析了多进程的概念与应用,帮助开发者充分利用多核处理器的计算能力。我们从基本的进程创建与启动开始,讲解了如何通过 Queue 实现进程间的数据传递,并通过对比多进程与多线程的性能差异,揭示了多进程在处理 CPU 密集型任务时的显著优势。文章还详细介绍了进程池(Pool)的使用方法,包括 mapapply_async 的不同应用场景。最后,我们探讨了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。

一 多进程

Multiprocessing 是一种编程和执行模式,它允许多个进程同时运行,以此提高应用程序的效率和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效利用多核处理器的能力。

1 导入进程标准模块
html" title=python>python">import multiprocessing as mp
2 定义调用函数
html" title=python>python">def job(a, d):print('你好 世界')
3 创建和启动进程
html" title=python>python"># 创建进程
p1 = mp.Process(target=job, args=(1, 2))
# 启动进程
p1.start()
# 连接进程
p1.join()

二 存储进程结果 Queue

1 存入输出到 Queue

html" title=python>python"># 该函数没有返回值!!!
def job02(q):res = 0for i in range(1000):res += i + i ** 2 + i ** 3q.put(res)  #def my_result_process02():q = mp.Queue()p1 = mp.Process(target=job02, args=(q,))p2 = mp.Process(target=job02, args=(q,))p1.start()p2.start()p1.join()p2.join()res1 = q.get()res2 = q.get()print(res1)print(res2)print(res1 + res2)

三 threading & multiprocessing 对比

1 创建多进程 multiprocessing
html" title=python>python">def job03(q):res = 0for i in range(1000000):res += i + i ** 2 + i ** 3# 结果加 queueq.put(res)# 多核运算多进程
def multicore03():q = mp.Queue()p1 = mp.Process(target=job03, args=(q,))p2 = mp.Process(target=job03, args=(q,))p1.start()p2.start()p1.join()p2.join()res1 = q.get()res2 = q.get()print('multicore:', res1 + res2)
2 创建多线程 multithread
html" title=python>python"># 单核运算多线程
def multithread03():# thread可放入process同样的queue中q = mp.Queue()t1 = td.Thread(target=job03, args=(q,))t2 = td.Thread(target=job03, args=(q,))t1.start()t2.start()t1.join()t2.join()res1 = q.get()res2 = q.get()print('multithread:', res1 + res2)
3 创建普通函数
html" title=python>python">def normal03():res = 0for _ in range(2):for i in range(1000000):res += i + i ** 2 + i ** 3print('normal:', res)
4 创建对比时间函数
html" title=python>python">def time_result03():st = time.time()normal03()st1 = time.time()print('normal time:', st1 - st)multithread03()st2 = time.time()print('multithread time:', st2 - st1)multicore03()print('multicore time:', time.time() - st2)
5 运行结果
normal03: 499999666667166666000000
normal03 time: 0.6855959892272949
multithread03: 499999666667166666000000
multithread03 time: 0.6804449558258057
multicore03: 499999666667166666000000
multicore03 time: 0.38849496841430664

我运行的是 normal03 > multithread03 > multicore03normal03multithread03 相差不大,multicore03normal03multithread03 快将近一倍。

四 进程池 Pool

使用进程池 Pool ,Python 会自行解决多进程问题。

1 进程池 Pool() 和 map()

map() 返回的是多结果。

html" title=python>python">def job04(x):# Pool的函数有返回值return x * xdef multicore04():# Pool的函数有返回值pool = mp.Pool()# 自分配 CPU 计算res = pool.map(job04, range(10))print(res)
2 自定义核数量

Pool 默认大小是 CPU的核数,传入 processes 参数自定义需要的核数量。

html" title=python>python">def multicore05():# 定义CPU核数量为3pool = mp.Pool(processes=3)  res = pool.map(job04, range(10))print(res)
3 apply_async 单结果返回

apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的, 所以在传入值后需要加逗号, 同时需要用 get() 方法获取返回值。

html" title=python>python">def multicore06():pool = mp.Pool()res = pool.apply_async(job04, (2,))# 用get获得结果print(res.get())
4 apply_async 多结果返回
html" title=python>python">def multicore07():pool = mp.Pool()multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]# 用get获得结果print([res.get() for res in multi_res])
5 划重点
  • Pool 默认调用是 CPU 的核数,传入 processes 参数可自定义CPU核数。
  • map() 放入迭代参数,返回多个结果。
  • apply_async() 只能放入一组参数,并返回一个结果,如果想得到 map() 的效果需要通过迭代。

五 共享内存 shared memory

1 定义 Shared Value

html" title=python>python">value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

2 定义 Shared Array

它只能是一维数组

html" title=python>python">array = mp.Array('i', [1, 2, 3, 4])

其中 d 和 i 参数用来设置数据类型的,d 表示一个双精浮点类型,i 表示一个带符号的整型,参考数据类型如下:

Type codeC TypePython TypeMinimum size in bytesNotes
'b'signed charint1
'B'unsigned charint1
'u'wchar_tUnicode character2(1)
'h'signed shortint2
'H'unsigned shortint2
'i'signed intint2
'I'unsigned intint2
'l'signed longint4
'L'unsigned longint4
'q'signed long longint8
'Q'unsigned long longint8
'f'floatfloat4
'd'doublefloat8

具体链接:Efficient arrays of numeric values

六 进程锁 Lock

1 不加进程锁

争抢共享内存

html" title=python>python">def job08(v, num):for _ in range(5):time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显v.value += num  # v.value获取共享变量值print(v.value, end="\n")def multicore08():v = mp.Value('i', 0)  # 定义共享变量p1 = mp.Process(target=job08, args=(v, 1))p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存p1.start()p2.start()p1.join()p2.join()
2 加进程锁
html" title=python>python">def job09(v, num, l):l.acquire()  # 锁住for _ in range(5):# print(v.value, num)time.sleep(0.1)v.value = v.value + num  # 获取共享内存print(v.value)l.release()  # 释放def multicore09():l = mp.Lock()  # 定义一个进程锁v = mp.Value('i', 0)  # 定义共享内存p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入p1.start()p1.join()p2 = mp.Process(target=job09, args=(v, 3, l))p2.start()p2.join()# def multicore10():
#     l = mp.Lock()  # 定义一个进程锁
#     v = mp.Value('i', 0)  # 定义共享内存
#     p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
#     p2 = mp.Process(target=job09, args=(v, 3, l))
#     p1.start()
#     p2.start()
#     p1.join()
#     p2.join()

在这个示例中,必须先执行 p1 以达到预期效果。分别运行 multicore09multicore10 会发现一些有意思的情况。

七 完整代码示例

:建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。

html" title=python>python"># This is a sample Python script.# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.import multiprocessing as mp
import threading as td
import time as timedef print_hi(name):# Use a breakpoint in the code line below to debug your script.print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.# 创建进程p1 = mp.Process(target=job, args=(1, 2))# 启动进程p1.start()# Shared Valuevalue1 = mp.Value('i', 0)value2 = mp.Value('d', 3.14)# Shared Array,只能是一维数组array = mp.Array('i', [1, 2, 3, 4])def job(a, d):print('你好 世界')# 该函数没有返回值!!!
def job02(q):res = 0for i in range(1000):res += i + i ** 2 + i ** 3q.put(res)  #def my_result_process02():q = mp.Queue()p1 = mp.Process(target=job02, args=(q,))p2 = mp.Process(target=job02, args=(q,))p1.start()p2.start()p1.join()p2.join()res1 = q.get()res2 = q.get()print(res1)print(res2)print(res1 + res2)def job03(q):res = 0for i in range(1000000):res += i + i ** 2 + i ** 3# 结果加 queueq.put(res)# 多核运算多进程
def multicore03():q = mp.Queue()p1 = mp.Process(target=job03, args=(q,))p2 = mp.Process(target=job03, args=(q,))p1.start()p2.start()p1.join()p2.join()res1 = q.get()res2 = q.get()print('multicore03:', res1 + res2)# 单核运算多线程
def multithread03():# thread可放入process同样的queue中q = mp.Queue()t1 = td.Thread(target=job03, args=(q,))t2 = td.Thread(target=job03, args=(q,))t1.start()t2.start()t1.join()t2.join()res1 = q.get()res2 = q.get()print('multithread03:', res1 + res2)def normal03():res = 0for _ in range(2):for i in range(1000000):res += i + i ** 2 + i ** 3print('normal03:', res)def time_result03():st = time.time()normal03()st1 = time.time()print('normal03 time:', st1 - st)multithread03()st2 = time.time()print('multithread03 time:', st2 - st1)multicore03()print('multicore03 time:', time.time() - st2)def job04(x):# Pool的函数有返回值return x * xdef multicore04():# Pool的函数有返回值pool = mp.Pool()# 自分配 CPU 计算res = pool.map(job04, range(10))print(res)def multicore05():pool = mp.Pool(processes=3)  # 定义CPU核数量为3res = pool.map(job04, range(10))print(res)def multicore06():pool = mp.Pool()# apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,# 所以在传入值后需要加逗号, 同时需要用get()方法获取返回值res = pool.apply_async(job04, (2,))# 用get获得结果print(res.get())def multicore07():pool = mp.Pool()multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]# 用get获得结果print([res.get() for res in multi_res])def job08(v, num):for _ in range(5):time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显v.value += num  # v.value获取共享变量值print(v.value, end="\n")def multicore08():v = mp.Value('i', 0)  # 定义共享变量p1 = mp.Process(target=job08, args=(v, 1))p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存p1.start()p2.start()p1.join()p2.join()def job09(v, num, l):l.acquire()  # 锁住for _ in range(5):# print(v.value, num)time.sleep(0.1)v.value = v.value + num  # 获取共享内存print(v.value)l.release()  # 释放def multicore09():l = mp.Lock()  # 定义一个进程锁v = mp.Value('i', 0)  # 定义共享内存p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入p1.start()p1.join()p2 = mp.Process(target=job09, args=(v, 3, l))p2.start()p2.join()def multicore10():l = mp.Lock()  # 定义一个进程锁v = mp.Value('i', 0)  # 定义共享内存p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入p2 = mp.Process(target=job09, args=(v, 3, l))p1.start()p2.start()p1.join()p2.join()# Press the green button in the gutter to run the script.
if __name__ == '__main__':print_hi('什么是 Multiprocessing')my_result_process02()time_result03()multicore04()multicore05()multicore06()multicore07()multicore08()multicore09()# multicore10()# See PyCharm help at https://www.jetbrains.com/help/pycharm/

复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。

Hi, 什么是 Multiprocessing
你好 世界
249833583000
249833583000
499667166000
normal03: 499999666667166666000000
normal03 time: 0.7139420509338379
multithread03: 499999666667166666000000
multithread03 time: 0.6696178913116455
multicore03: 499999666667166666000000
multicore03 time: 0.3917398452758789
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3
4
7
8
11
12
151516
19
1
2
3
4
5
8
11
14
17
20

八 源码地址

代码地址:

国内看 Gitee 之 什么是 Multiprocessing.py

国外看 GitHub 之 什么是 Multiprocessing.py

引用 莫烦 Python


http://www.ppmy.cn/devtools/118717.html

相关文章

Java多线程在单体、微服务、服务网格与云原生架构中的理解与线程安全保障:总结与对比

1. 引言 多线程在现代软件开发中的应用广泛,从单体应用到微服务、服务网格以及云原生架构,不同架构下对多线程的理解和线程安全保障存在差异。本文将分别分析这四种架构下的多线程机制,提取其共性与区别,帮助开发者在不同场景下合…

计算机的软件知识

1.4 软件知识 1.4.1 软件及其分类 软件是计算机系统中不可或缺的一部分,它是指计算机程序及其相关文档的集合,是交付给客户的一整套解决方案。软件不仅仅是程序代码,还包括了用自然语言描述的软件功能需求、设计文档、项目计划、用户手册等…

docker简单熟悉

‌Docker 容器和‌虚拟机区别‌ Docker容器与虚拟机的主要区别在于虚拟化层次和资源占用: ‌虚拟化层次‌:Docker容器在操作系统级别进行虚拟化,共享宿主机的内核;而虚拟机在硬件级别进行虚拟化,每个虚拟机都拥有独立…

大数据毕业设计选题推荐-起点小说数据分析与可视化平台-Hive-Hadoop-Spark

✨作者主页:IT研究室✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

如何使用ssm实现航空信息管理系统+vue

TOC ssm728航空信息管理系统vue 第一章 课题背景及研究内容 1.1 课题背景 信息数据从传统到当代,是一直在变革当中,突如其来的互联网让传统的信息管理看到了革命性的曙光,因为传统信息管理从时效性,还是安全性,还是…

微调大模型(Finetuning Large Language Models)—Evaluation(六)

1. 微调后对模型进行评估 模型的评估目前没有统一的标准,有从正向角度,核对是否命中,当然也有从反向角度,考虑未命中的错误分析。 常见的评估方式如图所示: 本节学习资料地址:传送门 2. 代码测试 2.1 …

Python/大数据/机器识别毕业设计选题题目推荐

基于Python和Diango在线购物商城系统报告文档指导搭建视频 基于深度学习的人脸识别与管理系统,Python实现 基于Python/机器学习链家网新房数据可视化及预测系统 Python豆瓣电影情感分析推荐系统爬虫可视化,过滤算法 基于python的django框架生鲜商城管…

【系统架构设计师】专题:系统质量属性和架构评估

更多内容请见: 备考系统架构设计师-核心总结目录 文章目录 一、质量属性概念1、开发期质量属性2、运行期质量属性二、面向架构评估的质量属性三、质量属性场景描述四、系统架构评估1、系统架构评估中的重要概念2、系统架构评估方法(1)`软件架构分析方法(Software Architecture…