要点:
multiprocessing 进程间信息交互
一 方法汇总
在 Python 进程中,有几种方法可以实现数据交互:
-
共享内存:这是一种用于进程间通信的高效方式。多个进程可以访问同一个共享内存区域,并在其中读取和写入数据。
-
管道(Pipe):这是一种用于进程间通信的基本方式。管道可以在两个进程之间传递数据。一个进程将数据写入管道,另一个进程从管道中读取数据。
-
队列(Queue):队列也是一种进程间通信的方式。一个进程将数据放入队列,另一个进程从队列中获取数据。
-
套接字(Socket):套接字是一种用于网络通信的方式,但它们也可以在同一台计算机上进行进程间通信。每个套接字都有一个唯一的地址,进程可以使用这个地址来发送和接收数据。
-
文件:进程可以使用文件作为数据交换的方式。一个进程将数据写入文件,另一个进程从文件中读取数据。
二 实际举例
2.1 共享内存
使用 multiprocessing.Value
可以创建进程间共享的变量,下面是一个例子,创建了一个类型为整数('i')的共享内存变量 value
,然后启动 10 个进程去调用 func
函数,该函数会将 value
的值加 1。最后输出 value
的值,应该是 10:
import multiprocessingdef func(value):value.value += 1if __name__ == '__main__':value = multiprocessing.Value('i', 0)processes = [multiprocessing.Process(target=func, args=(value,)) for _ in range(10)]for process in processes:process.start()for process in processes:process.join()print(value.value) # 输出 10
2.2 管道
使用 multiprocessing.Pipe
可以创建一个管道,两个进程可以通过这个管道互相传递数据,下面是一个例子,创建了一个管道,其中 parent_conn
是父进程持有的端口,child_conn
是子进程持有的端口。然后启动两个进程,分别调用 sender
和 receiver
函数。sender
函数发送一条消息到管道中,receiver
函数从管道中接收消息并打印出来:
import multiprocessingdef sender(conn):conn.send('Hello, receiver')def receiver(conn):message = conn.recv()print(message)if __name__ == '__main__':parent_conn, child_conn = multiprocessing.Pipe()p1 = multiprocessing.Process(target=sender, args=(parent_conn,))p2 = multiprocessing.Process(target=receiver, args=(child_conn,))p1.start()p2.start()p1.join()p2.join()
2.3 队列
使用 multiprocessing.Queue
可以创建一个进程间共享的队列,多个进程可以通过这个队列互相传递数据,下面是一个例子,创建了一个进程间共享的队列 q
,然后启动了四个进程去调用 worker
函数,该函数会从队列中获取数据并打印出来。主进程向队列中发送 10 个数值,每个进程都会从队列中获取数据并进行处理。当主进程发送完所有内容后,向队列中发送 N 个 None 值(N 等于进程数量),以通知各进程退出:
import multiprocessingdef worker(q):while True:item = q.get()if item is None:breakprint(item)if __name__ == '__main__':q = multiprocessing.Queue()processes = [multiprocessing.Process(target=worker, args=(q,)) for _ in range(4)]for process in processes:process.start()for i in range(10):q.put(i)for _ in range(len(processes)):q.put(None)for process in processes:process.join()
2.4 套接字
使用 Python 的 socket 模块可以创建套接字,进而实现网络通信和进程间通信。下面是一个简单的例子,创建了一个服务器进程和一个客户端进程。服务器进程监听本机的 8888 端口,接收客户端发来的数据并打印出来;客户端进程连接服务器的 8888 端口,并向服务器发送一条消息。运行上述代码后,可以看到服务器进程收到客户端发送的消息并打印出来:
import socketdef server():server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.bind(('127.0.0.1', 8888))server_socket.listen(1)conn, addr = server_socket.accept()while True:data = conn.recv(1024)if not data:breakprint(data.decode())conn.close()server_socket.close()def client():client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client_socket.connect(('127.0.0.1', 8888))client_socket.sendall(b'Hello, server')client_socket.close()if __name__ == '__main__':import multiprocessingserver_process = multiprocessing.Process(target=server)client_process = multiprocessing.Process(target=client)server_process.start()client_process.start()server_process.join()client_process.join()
2.5 文件
在 Python 中使用文件进行进程间通信也是比较常见的方式。下面是一个例子,创建了一个文件 test.txt
,该文件包含了三行文本。然后启动两个进程去调用 worker
函数,该函数会读取文件内容并打印出来。当两个进程都完成任务后,主进程结束。运行上述代码后,可以看到两个进程分别打印了 test.txt
文件的内容:
import multiprocessingdef worker(file):with open(file, 'r') as f:for line in f:print(line.rstrip())if __name__ == '__main__':filename = 'test.txt'with open(filename, 'w') as f:f.write('Line 1\n')f.write('Line 2\n')f.write('Line 3\n')processes = [multiprocessing.Process(target=worker, args=(filename,)) for _ in range(2)]for process in processes:process.start()for process in processes:process.join()
三 python子进程传数据到主进程的方式
Python中有多种方式可以让子进程传递数据给主进程。这里我列举其中三种比较常用的方式:
- 使用队列(Queue):队列是多进程编程中常用的通信工具,可以在多个进程之间传递消息。在主进程中初始化一个队列对象,然后将其作为参数传递给子进程,在子进程中使用put()方法向队列中添加数据,主进程可以使用get()方法获取数据。
下面是一个使用队列实现子进程传递数据给主进程的例子:
import multiprocessing as mpdef func(queue):# 子进程向队列中添加数据queue.put("hello from child process")if __name__ == '__main__':# 初始化一个队列queue = mp.Queue()# 创建一个子进程并将队列作为参数传递给它p = mp.Process(target=func, args=(queue,))p.start()# 主进程从队列中获取数据data = queue.get()print(data)
- 使用管道(Pipe):管道也可以在多个进程之间传递消息,不同于队列的是它只支持两个进程之间的通信。在主进程中创建一个管道,然后将其作为参数传递给子进程,在子进程中使用send()方法向管道中发送数据,主进程可以使用recv()方法接收数据。
下面是一个使用管道实现子进程传递数据给主进程的例子:
import multiprocessing as mpdef func(pipe):# 子进程向管道中发送数据pipe.send("hello from child process")if __name__ == '__main__':# 创建一个管道parent_conn, child_conn = mp.Pipe()# 创建一个子进程并将管道作为参数传递给它p = mp.Process(target=func, args=(child_conn,))p.start()# 主进程从管道中接收数据data = parent_conn.recv()print(data)
- 使用共享内存(Value和Array):共享内存可以让多个进程之间共享同一块内存区域,这样就可以避免进程之间频繁地复制数据。在主进程中使用Value或Array创建一个共享内存对象,然后将其作为参数传递给子进程,在子进程中可以直接修改共享内存对象中的值,主进程也可以直接读取共享内存对象中的值。
下面是一个使用共享内存实现子进程传递数据给主进程的例子:
import multiprocessing as mpdef func(val):# 子进程修改共享内存对象中的值val.value = 123if __name__ == '__main__':# 创建一个共享内存对象val = mp.Value('i', 0)# 创建一个子进程并将共享内存对象作为参数传递给它p = mp.Process(target=func, args=(val,))p.start()# 主进程读取共享内存对象中的值print(val.value)