Python进程与线程:分布式进程

server/2024/12/29 8:03:35/

在Python中,当我们面临选择使用线程(Thread)还是进程(Process)时,进程往往因其更高的稳定性和可扩展性而被优先考虑。特别是,进程能够跨越多台机器进行分布,而线程则受限于同一台机器的多个CPU核心。Python的multiprocessing模块不仅支持多进程,其managers子模块更是提供了将多进程分布到多台机器上的能力。

通过managers模块,我们可以轻松编写分布式多进程程序,而无需深入了解网络通信的细节。以下是一个具体的例子,展示了如何将原本在同一台机器上运行的使用Queue进行通信的多进程程序,扩展到两台机器上运行。

服务进程(task_master.py)

服务进程负责启动并注册Queue到网络上,然后向其中写入任务。

 

python复制代码

import random, time, queue
from multiprocessing.managers import BaseManager
# 定义任务队列和结果队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 创建一个继承自BaseManager的QueueManager类
class QueueManager(BaseManager):
pass
# 将两个Queue注册到网络上
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 启动QueueManager,绑定端口5000,并设置验证码'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
manager.start()
# 获取网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 向任务队列中添加任务
for i in range(10):
n = random.randint(0, 10000)
print(f'Put task {n}...')
task.put(n)
# 从结果队列中读取结果
print('Try getting results...')
for i in range(10):
r = result.get(timeout=10)
print(f'Result: {r}')
# 关闭manager
manager.shutdown()
print('Master exit.')

注意:在分布式环境中,添加任务到Queue时,必须通过manager.get_task_queue()获得的接口进行,而不能直接操作原始的task_queue

任务进程(task_worker.py)

任务进程负责连接到服务进程,从任务队列中取任务,并将结果放入结果队列。

 

python复制代码

import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建QueueManager类
class QueueManager(BaseManager):
pass
# 注册Queue的访问接口
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务进程所在的服务器
server_addr = '127.0.0.1' # 替换为实际的服务进程IP地址
print(f'Connecting to server {server_addr}...')
manager = QueueManager(address=(server_addr, 5000), authkey=b'abc')
manager.connect()
# 获取Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 处理任务
for i in range(10):
try:
n = task.get(timeout=1)
print(f'Running task {n} * {n}...')
r = f'{n} * {n} = {n*n}'
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty.')
print('Worker exit.')

运行步骤

  1. 在一台机器上启动服务进程(task_master.py)。
  2. 在另一台机器(或同一台机器的不同终端)上启动任务进程(task_worker.py),并确保其连接到服务进程的正确IP地址和端口。

分布式计算的简单应用

这个Master/Worker模型展示了分布式计算的基本原理。通过简单的改造,可以启动多个worker进程,将任务分布到多台机器上,实现高效的并行处理。例如,将计算n*n的代码替换为发送邮件的逻辑,就可以构建一个邮件队列的异步发送系统。

注意

  • Queue的作用:Queue用于传递任务和接收结果,因此每个任务的描述数据量应尽量小。例如,发送处理日志文件的任务时,应传递日志文件的路径而非文件本身。
  • authkey的重要性:authkey用于确保两台机器之间的正常通信,防止恶意干扰。如果任务进程的authkey与服务进程不一致,将无法建立连接。

总结

Python的分布式进程接口简洁且封装良好,非常适合在需要将繁重任务分布到多台机器的环境中应用。通过合理利用这些接口,我们可以轻松实现高效的分布式计算。


http://www.ppmy.cn/server/153098.html

相关文章

Scala-异常

1.空指针异常 NullPointerException 2.索引越界异常 IndexOutOfBoundsException 3.算术运算异常 ArithmeticException try{ 可能发生异常的代码 }catch { 发生异常之后的处理逻辑 case e:异常类型1 > 处理逻辑1 case e:异常类型2 > 处理逻辑2 …… }finally{ 无论是否有…

Uniapp 手机基座调试App 打包成Apk文件,并上传到应用商店

1.Uniapp手机基座调试App。 1.1 以下是我另一篇文章 讲解 uniapp连接手机基座调试App、 Hbuildx使用SUB运行到手机基座测试_hbuilder基座-CSDN博客 2.打包本地的uniapp项目为apk文件。 打包的方式有很多种,我们可以选择本地打包和远程云端打包两种方式。 我们在打包…

YOLOv8 引入高效的可变形卷积网络 DCNv4 | 重新思考用于视觉应用的动态和稀疏算子

我们介绍了可变形卷积v4(DCNv4),这是一种为广泛的视觉应用设计的高效且有效的算子。DCNv4通过以下两项关键改进解决了其前身DCNv3的局限性: 在空间聚合中移除softmax归一化,以增强其动态特性和表达能力。优化内存访问,减少冗余操作以提高速度。这些改进使得DCNv4相比DCNv…

设计模式——建造者模式

设计模式——建造者模式 目录 设计模式——建造者模式介绍实现结构及工作流程经典实现示例优缺点使用场景 扩展实现示例 总结 介绍 建造者模式(Builder Pattern)是一种创建型设计模式,它通过将一个复杂对象的构建过程分解成多个简单的步骤&a…

2024楚慧杯WP

web 速算比赛 Sal的图集 ssti {{config.__class__.__init__.__globals__.get("os").popen(tac /flag).read()}} popmart index.php源码 <?php173 $pat "/^(((1?\d{1,2})|(2[0-4]\d)|(25[0-5]))\.){3}((1?\d{1,2})|(2[0-4]\d)|(25[0-5]))/";17…

STM32--超声波模块(HC—SR04)(标准库+HAL库)

一、HC-SR04工作原理 1&#xff09;采用IO触发测距&#xff0c;给至少10us的高电平信号。 2&#xff09;模块自动发送8个40KHz的方波&#xff0c;自动检测是否有信号返回。 3&#xff09;有信号返回&#xff0c;通过IO输出一高电平&#xff0c;高电平持续时间就是超声波从发…

9_HTML5 SVG (5) --[HTML5 API 学习之旅]

SVG 模糊效果 HTML5中的SVG&#xff08;可缩放矢量图形&#xff09;允许我们创建高质量的二维图形&#xff0c;包括应用各种滤镜效果。模糊效果是通过<feGaussianBlur>滤镜原语来实现的。下面我将给出4个使用SVG进行模糊效果处理的示例&#xff0c;并为每个代码段添加详…

【期末复习】JavaEE(下)

1. MVC开发模式 1.1. 运行流程 1.2. SpringMVC 核心组件 1.3. 注解解释 2. ORM与MyBatis 2.1. ORM—对象关系映射 2.2. MyBatis 2.2.1. 创建步骤 会话是单例的&#xff0c;不能跨方法。&#xff08;单例的原因主要是从数据安全角度出发&#xff09; import org.apache.ibatis…