multiprocessing的三种模式
- fork,【拷贝几乎所有资源】【支持文件对象/线程锁等传参】【unix】【任意位置开始】【快】
- spawn,【run参数传参必备资源】【不支持文件对象/线程锁等传参】【unix、win】【main代码块开始】【慢】
- forkserver,【run参数传必备资源】【不支持文件对象/线程锁传参】【部分unix】【main代码块开始】
import multiprocessing
multiprocessing.set_start_method('spawn')
官方文档https://docs.python.org/zh-cn/3/library/multiprocessing.html
进程锁
-
Lock类与RLock类相同:由于进程之间随机调度:某进程可能执行n条后,CPU接着执行其他进程。为了多个进程同时操作一个内存中的资源时不产生混乱,我们使用锁。
-
Lock类与RLock类的区别:无论是Lock还是RLock,提供的方法都非常简单,acquire和release。但是Lock和RLock的区别是什么呢?RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
def work(filename, max_count):for n in range(max_count):f = open(filename, "r")try:nbr = int(f.read())except ValueError as err:print ("File is empty, starting to count from 0, error: " + str(err))nbr = 0f = open(filename, "w")f.write(str(nbr + 1) + '\n')f.close()if __name__ == '__main__':work('demo.txt', 5)# 输出
File is empty, starting to count from 0, error: invalid literal for int() with base 10: ''Process finished with exit code 0
工作函数被调用了 5 次,正如所期望的那样,它计数正确,没有损失任何数据。在第一次读取时,见到了一个空文件。这会为 int()抛出 invalid literal for int()的
错误(因为在一个空字符串上调用了 int())。这个错误只发生了一次,之后,我们总是会有一个合法的值用于读取并把它转变成一个整数。
def run_one_work(filename, lock):lock.acquire()f = open(filename, "r")try:nbr = int(f.read())except ValueError as err:print("File is empty, starting to count from 0, error: " + str(err))nbr = 0f = open(filename, "w")f.write(str(nbr + 1) + '\n')f.close()lock.release()if __name__ == '__main__':multiprocessing.set_start_method("spawn")lock = multiprocessing.Lock()file_name = 'demo.txt'for i in range(5):p = multiprocessing.Process(target=run_one_work, args=(file_name,lock,))p.start()p.join()time.sleep(7)
# spawn模式需要特殊处理
process_list = []
for i in range(5):multiprocessing.Process(target=run_one_work, args=(file_name,lock,))p.start()process_list.append(p)
for p in process_list:p.join()
锁实例程序:
from multiprocessing import Process
from multiprocessing import Lock
import time
import jsondef show_ticket(i):time.sleep(0.1)with open('ticket') as f:dic = json.load(f)print('余票: %s' %dic.get('ticket'))def buy_ticket(i, lock):lock.acquire() #加锁with open('ticket') as f:dic = json.load(f)time.sleep(0.1)if dic.get('ticket') >0:dic['ticket'] -= 1print('\033[32m%s买到票了\033[0m' %i)else:print('\033[31m%s没买到票\033[0m' %i)time.sleep(0.1)with open('ticket', 'w')as f:json.dump(dic, f)lock.release() #释放锁
if __name__ == '__main__':for i in range(10):p = Process(target=show_ticket, args=(i,) )p.start()lock = Lock()for i in range(10):p1 = Process(target=buy_ticket, args=(i,lock))p1.start()