python 内置模块multiprocessing,进程

news/2025/3/16 0:41:07/
一、简介

进程是操作系统进行资源分配的基本单位,也就是说每启动一个进程,操作系统都会给其分配一定的运行资源(内存资源)保证进程的运行;每个进程都是独立运行的,不互相干扰
注意::进程锁和线程锁大同小异,此文章不对进程锁做过多解释,可参考:python内置模块threading

二、基本使用
import multiprocessing
import time
import osstart_time = time.time()def process1(text):print('process1:',os.getpid())time.sleep(1)print(text)def process2(text):print('process2:',os.getpid())time.sleep(2)print(text)if __name__ == '__main__':t1 = multiprocessing.Process(target=process1, args=('process1_sleep',))t2 = multiprocessing.Process(target=process2, args=('process2_sleep',))t1.start()t2.start()t1.join()#等待进程结束t2.join()#等待进程结束end_time = time.time()print('主进程结束', end_time - start_time)
三、方法
  1. multiprocessing.Process(target,name,args,kwargs) 创建进程,target:执行的目标任务名、name:进程名字、args:以元组方式给执行任务传参、kwargs:以字典方式给执行任务传参
  • process.start() 运行进程
  • process.join(timeout) 阻塞运行,如果遇到延时任务,使用join,主进程会等到延时任务结束,才能执行主进程
  • process.terminate():不管任务是否完成,立即终止子进程;类似线程中的守护线程
  • process.is_alive() 判断进程是否还活着
  • process.name:进程名,如果没设置名称系统会自动分配一个名称,可以用了设置进程名字
  • process.daemon:守护进程,主进程结束,就算有子进程正在运行,程序也会结束,不能和join一起使用,join会阻塞主进程
  1. multiprocessing.Lock() 同步锁:最多只有一个持有该锁,被加锁的进程在运行时不会将执行权交出去,只有当该进程被解锁时才会将执行权通过系统调度交由其他进程
  • lock.locaked():判断该锁对象是否处于上锁状态
  • lock.acquire(timeout=1)) :锁住进程,timeout失效时间
  • lock.release() :释放进程
  1. multiprocessing.Rlock() 递归锁,RLock允许在同一进程中被多次acquire,Lock不允许
  • rlock.locaked():判断该锁对象是否处于上锁状态
  • rlock.acquire(timeout=1)) :锁住进程,timeout失效时间
  • rlock.release() :释放进程
  1. multiprocessing.Condition() 条件锁,条件锁是在递归锁的基础上增加了暂停进程运行的功能,可以使用wait()与notify()来控制进程执行的个数
  • clock.acquire(timeout=1)) :锁住进程,timeout失效时间
  • clock.release() :释放进程
  • lockObject.wait(timeout=None) :将当前进程设置为“等待”状态,只有该进程接到“通知”或者超时时间到期之后才会继续运行,在“等待”状态下的进程将允许系统根据策略自行切换到其他进程中运行
  • lockObject.wait_for(predicate, timeout=None) :将当前进程设置为“等待”状态,只有该进程的predicate返回一个True或者超时时间到期之后才会继续运行,在“等待”状态下的进程将允许系统根据策略自行切换到其他进程中运行。注意:predicate参数应当传入一个可调用对象,且返回结果为bool类型
  • lockObject.notify(n=1) :通知一个当前状态为“等待”的进程继续运行,也可以通过参数n通知多个
  • lockObject.notify_all():通知所有当前状态为“等待”的进程继续运行
  1. multiprocessing.Event() 事件锁,事件锁是基于条件锁来做的,它与条件锁的区别在于一次只能放行全部,不能放行任意个数量的子进程继续运行
  • elock.clear() 将事件锁设为红灯状态,即所有进程暂停运行
  • elock.is_set() 用来判断当前事件锁状态,红灯为False,绿灯为True
  • elock.set() 将事件锁设为绿灯状态,即所有进程恢复运行
  • elock.wait(timeout=None) 将当前进程设置为“等待”状态,只有该进程接到“绿灯通知”或者超时时间到期之后才会继续运行,在“等待”状态下的进程将允许系统根据策略自行切换到其他进程中运行
  1. multiprocessing.Semaphore() 信号量锁,成批的放行特定个处于“上锁”状态的进程
  • slock.acquire(blocking=True, timeout=1) 上锁,内置计数器-1,直到为0的时候阻塞
  • slock.release() 解锁,内置计数器+1,并让某个进程的acquire()从阻塞变为不阻塞
  1. multiprocessing.BoundedSemaphor(value) 信号量锁 会检查内部计数器的值,并保证它不会大于初始值,如果超了,就引发一个 ValueError
  2. multiprocessing.Queue(size):进程队列,size队列个数
  • queue.qsize() 返回队列的大小
  • queue.empty() 如果队列为空,返回True,反之False
  • queue.full() 如果队列满了,返回True,反之False
  • queue.get(block,timeout) 获取队列,block:从队列里面取值,如果取不到值的话,程序不会结束, timeout:当block的值为真的时候,timeout是用来等待多少秒
  • queue.get_nowait() 相当queue.get(block=False)
  • queue.put(value,block,timeout) 写入队列,value:写入队列的值,,block:队列如果满了的话,再往队列里放值的话会等待, timeout:当block的值为真的时候,timeout是用来等待多少秒
  • queue.put_nowait(value) 相当queue.put(value, block=False)
  • queue.close() 关闭队列
    '''
    因进程之间是独立运行的,所以全局变量不会共享,如需共享需要用到队列
    以下例子会发现全局num不会再进程之间共享,而在task1中的put进的队列数据可以共享
    '''import multiprocessingnum = 1def task1(q):global numnum = 2q.put(1)q.put(2)def task2(q):print(f'task2:num={num}')print('队列:', q.get())if __name__ == '__main__':process_q = multiprocessing.Queue()process1 = multiprocessing.Process(target=task1,args=(process_q,))process2 = multiprocessing.Process(target=task2,args=(process_q,))process1.start()process2.start()process1.join()process2.join()print('队列:', process_q.get())print(f'全局变量:num={num}')
    
  1. multiprocessing.SimpleQueue() 简化的队列,无跟踪任务的功能,只具有empty、get、put3个方法
  • queue.empty() 如果队列为空,返回True,反之False
  • queue.get(block,timeout) 获取队列,block:从队列里面取值,如果取不到值的话,程序不会结束, timeout:当block的值为真的时候,timeout是用来等待多少秒
  • queue.put(value,block,timeout) 写入队列,value:写入队列的值,,block:队列如果满了的话,再往队列里放值的话会等待, timeout:当block的值为真的时候,timeout是用来等待多少秒
  1. multiprocessing.JoinableQueue(size):可阻塞的进程队列,size队列个数
  • queue.qsize() 返回队列的大小
  • queue.empty() 如果队列为空,返回True,反之False
  • queue.full() 如果队列满了,返回True,反之False
  • queue.get(block,timeout) 获取队列,block:从队列里面取值,如果取不到值的话,程序不会结束, timeout:当block的值为真的时候,timeout是用来等待多少秒
  • queue.get_nowait() 相当queue.get(block=False)
  • queue.put(value,block,timeout) 写入队列,value:写入队列的值,,block:队列如果满了的话,再往队列里放值的话会等待, timeout:当block的值为真的时候,timeout是用来等待多少秒
  • queue.put_nowait(value) 相当queue.put(value, block=False)
  • queue.join() 阻塞队列,队列元素未全部处理完前,进程阻塞
  • queue.task_done() 解除join阻塞,完成一次任务,计数机制-1
  • queue.join_thread() 连接队列的后台线程,此方法用于在调用q.close()方法后,等待所有队列项被消耗
  • queue.cancel_join_thread() 防止join_thread()方法阻塞,不会在进程退出时自动连接后台线程
  • queue.task_done() 向队列发送task_done(),让该队列不要在join
  • queue.close() 关闭队列,指示当前进程将不会再往队列中放入对象
    import multiprocessing
    import time
    import randomdef task1(q):for i in range(10):q.put(i)time.sleep(random.randint(1, 3))q.join()print('数据处理完毕')def task2(q):while True:print('task2队列:', q.get())q.task_done()if __name__ == '__main__':process_q = multiprocessing.JoinableQueue()process1 = multiprocessing.Process(target=task1, args=(process_q,))process2 = multiprocessing.Process(target=task2, args=(process_q,))process2.daemon = Trueprocess1.start()process2.start()process1.join()print('主进程结束')
    
  1. multiprocessing.Pipe():进程通道,用法基本和Queue一致,返回一对 Connection 对象 (conn1, conn2) ,conn1 只能用于接收消息,conn2 仅能用于发送消息
    from multiprocessing import Process, Pipe
    import timedef task(i, pip):while True:a = pip.recv()print(f"进程{i}收到一个{a}")time.sleep(0.5)pip.send(a + 1)if __name__ == '__main__':pip_start, pip_end = Pipe()Process(target=task, args=(1, pip_start)).start()Process(target=task, args=(2, pip_end)).start()pip_start.send(1)
    
  2. process.current_process() 返回当前进程
  3. process.active_children() 返回当前存活的进程列表
  4. process.parent_process():返回父进程
  5. process.cpu_count():返回cpu数量
三、进程池

进程池multiprocessing.Pool()的多进程之间的通信要用multiprocessing.Manager().Queue()

  1. multiprocessing.Pool()
    from multiprocessing import Process,Pool
    import timedef task(task):time.sleep(1)print(task * task)if __name__ == '__main__':task_list = [1, 2, 3, 4, 5, 6]print('顺序:')start_time = time.time()for i in task_list:process = Process(target=task, args=(i,))process.start()process.join()end_time = time.time()print('顺序执行时间',end_time - start_time)print('多进程进程池:')start_time = time.time()pool = Pool(6)pool.map(task,task_list)pool.close()pool.join()end_time = time.time()print('多进程执行时间', end_time - start_time)
    
  • pool.map(fun,iterable) Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
  • pool.map_async(fun,iterable) 与内置的map函数用法行为基本一致,但是它是非阻塞的
  • pool.close() 关闭进程池(pool),使其不再接受新的任务
  • pool.terminal() 结束工作进程,不再处理未处理的任务
  • pool.join() 主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用
  • pool.is_alive() 判断进程是否还活着
  1. concurrent.futures 模块,线程池、进程池

http://www.ppmy.cn/news/92831.html

相关文章

基于 SpringBoot + Redis 实现分布式锁

大家好,我是余数,这两天温习了下分布式锁,然后就顺便整理了这篇文章出来。文末附有源码链接,需要的朋友可以自取。 至于什么是分布式锁,这里不做赘述,不了解的可以自行去查阅资料。 文章目录 实现要点项目…

SVN安装教程详解:快速掌握SVN的安装和使用方法

本文为大家介绍了SVN的详细安装步骤,并提供了具体的命令行操作和TortoiseSVN的图形用户界面操作说明,帮助读者轻松掌握SVN的安装和使用方法。包括创建版本库,添加项目文件到版本库,创建工作副本,对文件进行修改和提交&…

[GreyCTF‘23] crypto部分

baby crypto 凯撒签到 whuo{squi4h_s1fx3h_v0h_co_i4b4T} grey{caes4r_c1ph3r_f0r_my_s4l4D} The Vault 这里只有一个check_keys函数,加密这块破不了,只要过了check_keys就行。 from hashlib import sha256 from Crypto.Util.number import long_to_…

Linux网络服务:部署YUM仓库与NFS服务

目录 一、理论 1.部署YUM仓库服务 2.NFS共享存储服务 二、实验 1.通过httpd服务建立yum仓库 2.通过vsftpd服务建立yum仓库 3.搭建NFS实现2台或3台服务器共享一个目录 一、理论 1.部署YUM仓库服务 (1) YUM简介 YUM的前身是YUP,借助于YUM软件仓库&#xff0c…

如何使用Chatgpt做论文降重呢?

使用ChatGPT做论文降重详细操作步骤 一、说明 1、普通的降重方法有:多重翻译降重、改写润色降重、续写降重,在降重的过程中可以配合使用,效果更加。 2、ChatGPT的高级降重方法在最后一个,就是dan模式降重,dan可以自定义…

Nacos配置管理、Fegin远程调用、Gateway服务网关

1.Nacos配置管理 Nacos除了可以做注册中心,同样可以做配置管理来使用。 1.1.统一配置管理 当微服务部署的实例越来越多,达到数十、数百时,逐个修改微服务配置就会让人抓狂,而且很容易出错。我们需要一种统一配置管理方案&#xf…

014、检查点

检查点 检查点触发机制为什么需要检查点检查点工作流程检查点记录位置Database Recovery检查点调整checkpoint_completion_target检查点触发机制 在PostgreSQL中,检查点(后台)进程执行检查点;当发生下列情况之一时,其进程将启动: 检查点间隔时间由checkpoint_timeout设置…

JAVA创建线程的两种方法

Java创建线程的方法有两种 两种方法分别是继承Thread类和实现Runnable接口。 继承Thread类 class MyThread extends Thread {Overridepublic void run() {System.out.println("Hello World!");} }public class Main {public static void main(String[] args) {MyT…