一、简介
进程是操作系统进行资源分配的基本单位,也就是说每启动一个进程,操作系统都会给其分配一定的运行资源(内存资源)保证进程的运行;每个进程都是独立运行的,不互相干扰
注意::进程锁和线程锁大同小异,此文章不对进程锁做过多解释,可参考:python内置模块threading
二、基本使用
import multiprocessing
import time
import osstart_time = time.time()def process1(text):print('process1:',os.getpid())time.sleep(1)print(text)def process2(text):print('process2:',os.getpid())time.sleep(2)print(text)if __name__ == '__main__':t1 = multiprocessing.Process(target=process1, args=('process1_sleep',))t2 = multiprocessing.Process(target=process2, args=('process2_sleep',))t1.start()t2.start()t1.join()#等待进程结束t2.join()#等待进程结束end_time = time.time()print('主进程结束', end_time - start_time)
三、方法
- multiprocessing.Process(target,name,args,kwargs) 创建进程,target:执行的目标任务名、name:进程名字、args:以元组方式给执行任务传参、kwargs:以字典方式给执行任务传参
- process.start() 运行进程
- process.join(timeout) 阻塞运行,如果遇到延时任务,使用join,主进程会等到延时任务结束,才能执行主进程
- process.terminate():不管任务是否完成,立即终止子进程;类似线程中的守护线程
- process.is_alive() 判断进程是否还活着
- process.name:进程名,如果没设置名称系统会自动分配一个名称,可以用了设置进程名字
- process.daemon:守护进程,主进程结束,就算有子进程正在运行,程序也会结束,不能和join一起使用,join会阻塞主进程
- multiprocessing.Lock() 同步锁:最多只有一个持有该锁,被加锁的进程在运行时不会将执行权交出去,只有当该进程被解锁时才会将执行权通过系统调度交由其他进程
- lock.locaked():判断该锁对象是否处于上锁状态
- lock.acquire(timeout=1)) :锁住进程,timeout失效时间
- lock.release() :释放进程
- multiprocessing.Rlock() 递归锁,RLock允许在同一进程中被多次acquire,Lock不允许
- rlock.locaked():判断该锁对象是否处于上锁状态
- rlock.acquire(timeout=1)) :锁住进程,timeout失效时间
- rlock.release() :释放进程
- multiprocessing.Condition() 条件锁,条件锁是在递归锁的基础上增加了暂停进程运行的功能,可以使用wait()与notify()来控制进程执行的个数
- clock.acquire(timeout=1)) :锁住进程,timeout失效时间
- clock.release() :释放进程
- lockObject.wait(timeout=None) :将当前进程设置为“等待”状态,只有该进程接到“通知”或者超时时间到期之后才会继续运行,在“等待”状态下的进程将允许系统根据策略自行切换到其他进程中运行
- lockObject.wait_for(predicate, timeout=None) :将当前进程设置为“等待”状态,只有该进程的predicate返回一个True或者超时时间到期之后才会继续运行,在“等待”状态下的进程将允许系统根据策略自行切换到其他进程中运行。注意:predicate参数应当传入一个可调用对象,且返回结果为bool类型
- lockObject.notify(n=1) :通知一个当前状态为“等待”的进程继续运行,也可以通过参数n通知多个
- lockObject.notify_all():通知所有当前状态为“等待”的进程继续运行
- multiprocessing.Event() 事件锁,事件锁是基于条件锁来做的,它与条件锁的区别在于一次只能放行全部,不能放行任意个数量的子进程继续运行
- elock.clear() 将事件锁设为红灯状态,即所有进程暂停运行
- elock.is_set() 用来判断当前事件锁状态,红灯为False,绿灯为True
- elock.set() 将事件锁设为绿灯状态,即所有进程恢复运行
- elock.wait(timeout=None) 将当前进程设置为“等待”状态,只有该进程接到“绿灯通知”或者超时时间到期之后才会继续运行,在“等待”状态下的进程将允许系统根据策略自行切换到其他进程中运行
- multiprocessing.Semaphore() 信号量锁,成批的放行特定个处于“上锁”状态的进程
- slock.acquire(blocking=True, timeout=1) 上锁,内置计数器-1,直到为0的时候阻塞
- slock.release() 解锁,内置计数器+1,并让某个进程的acquire()从阻塞变为不阻塞
- multiprocessing.BoundedSemaphor(value) 信号量锁 会检查内部计数器的值,并保证它不会大于初始值,如果超了,就引发一个 ValueError
- multiprocessing.Queue(size):进程队列,size队列个数
- queue.qsize() 返回队列的大小
- queue.empty() 如果队列为空,返回True,反之False
- queue.full() 如果队列满了,返回True,反之False
- queue.get(block,timeout) 获取队列,block:从队列里面取值,如果取不到值的话,程序不会结束, timeout:当block的值为真的时候,timeout是用来等待多少秒
- queue.get_nowait() 相当queue.get(block=False)
- queue.put(value,block,timeout) 写入队列,value:写入队列的值,,block:队列如果满了的话,再往队列里放值的话会等待, timeout:当block的值为真的时候,timeout是用来等待多少秒
- queue.put_nowait(value) 相当queue.put(value, block=False)
- queue.close() 关闭队列
''' 因进程之间是独立运行的,所以全局变量不会共享,如需共享需要用到队列 以下例子会发现全局num不会再进程之间共享,而在task1中的put进的队列数据可以共享 '''import multiprocessingnum = 1def task1(q):global numnum = 2q.put(1)q.put(2)def task2(q):print(f'task2:num={num}')print('队列:', q.get())if __name__ == '__main__':process_q = multiprocessing.Queue()process1 = multiprocessing.Process(target=task1,args=(process_q,))process2 = multiprocessing.Process(target=task2,args=(process_q,))process1.start()process2.start()process1.join()process2.join()print('队列:', process_q.get())print(f'全局变量:num={num}')
- multiprocessing.SimpleQueue() 简化的队列,无跟踪任务的功能,只具有empty、get、put3个方法
- queue.empty() 如果队列为空,返回True,反之False
- queue.get(block,timeout) 获取队列,block:从队列里面取值,如果取不到值的话,程序不会结束, timeout:当block的值为真的时候,timeout是用来等待多少秒
- queue.put(value,block,timeout) 写入队列,value:写入队列的值,,block:队列如果满了的话,再往队列里放值的话会等待, timeout:当block的值为真的时候,timeout是用来等待多少秒
- multiprocessing.JoinableQueue(size):可阻塞的进程队列,size队列个数
- queue.qsize() 返回队列的大小
- queue.empty() 如果队列为空,返回True,反之False
- queue.full() 如果队列满了,返回True,反之False
- queue.get(block,timeout) 获取队列,block:从队列里面取值,如果取不到值的话,程序不会结束, timeout:当block的值为真的时候,timeout是用来等待多少秒
- queue.get_nowait() 相当queue.get(block=False)
- queue.put(value,block,timeout) 写入队列,value:写入队列的值,,block:队列如果满了的话,再往队列里放值的话会等待, timeout:当block的值为真的时候,timeout是用来等待多少秒
- queue.put_nowait(value) 相当queue.put(value, block=False)
- queue.join() 阻塞队列,队列元素未全部处理完前,进程阻塞
- queue.task_done() 解除join阻塞,完成一次任务,计数机制-1
- queue.join_thread() 连接队列的后台线程,此方法用于在调用q.close()方法后,等待所有队列项被消耗
- queue.cancel_join_thread() 防止join_thread()方法阻塞,不会在进程退出时自动连接后台线程
- queue.task_done() 向队列发送task_done(),让该队列不要在join
- queue.close() 关闭队列,指示当前进程将不会再往队列中放入对象
import multiprocessing import time import randomdef task1(q):for i in range(10):q.put(i)time.sleep(random.randint(1, 3))q.join()print('数据处理完毕')def task2(q):while True:print('task2队列:', q.get())q.task_done()if __name__ == '__main__':process_q = multiprocessing.JoinableQueue()process1 = multiprocessing.Process(target=task1, args=(process_q,))process2 = multiprocessing.Process(target=task2, args=(process_q,))process2.daemon = Trueprocess1.start()process2.start()process1.join()print('主进程结束')
- multiprocessing.Pipe():进程通道,用法基本和Queue一致,返回一对 Connection 对象 (conn1, conn2) ,conn1 只能用于接收消息,conn2 仅能用于发送消息
from multiprocessing import Process, Pipe import timedef task(i, pip):while True:a = pip.recv()print(f"进程{i}收到一个{a}")time.sleep(0.5)pip.send(a + 1)if __name__ == '__main__':pip_start, pip_end = Pipe()Process(target=task, args=(1, pip_start)).start()Process(target=task, args=(2, pip_end)).start()pip_start.send(1)
- process.current_process() 返回当前进程
- process.active_children() 返回当前存活的进程列表
- process.parent_process():返回父进程
- process.cpu_count():返回cpu数量
三、进程池
进程池multiprocessing.Pool()的多进程之间的通信要用multiprocessing.Manager().Queue()
- multiprocessing.Pool()
from multiprocessing import Process,Pool import timedef task(task):time.sleep(1)print(task * task)if __name__ == '__main__':task_list = [1, 2, 3, 4, 5, 6]print('顺序:')start_time = time.time()for i in task_list:process = Process(target=task, args=(i,))process.start()process.join()end_time = time.time()print('顺序执行时间',end_time - start_time)print('多进程进程池:')start_time = time.time()pool = Pool(6)pool.map(task,task_list)pool.close()pool.join()end_time = time.time()print('多进程执行时间', end_time - start_time)
- pool.map(fun,iterable) Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
- pool.map_async(fun,iterable) 与内置的map函数用法行为基本一致,但是它是非阻塞的
- pool.close() 关闭进程池(pool),使其不再接受新的任务
- pool.terminal() 结束工作进程,不再处理未处理的任务
- pool.join() 主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用
- pool.is_alive() 判断进程是否还活着
- concurrent.futures 模块,线程池、进程池