Python操作RabbitMq入门

news/2024/11/9 2:43:32/

秋风阁-北溪入江流

Docker部署RabbitMq

version: '3.8'
services:rabbitmq:image: rabbitmq:3.11.19container_name: rabbitmqhostname: rabbitmqports:# amqp协议通讯端口(对外服务必开)- 5672:5672# RabbitMq自带管理界面访问端口- 15672:15672environment:- RABBITMQ_DEFAULT_VHOST=${vhost}- RABBITMQ_DEFAULT_USER=${user}- RABBITMQ_DEFAULT_PASS=${password}

RabbitMq自带有专门的管理界面,可以在其管理界面对RabbitMq进行管理查看等操作。
RabbitMq的管理界面的对外端口为15672,当我们启动RabbitMq后,需要启动管理界面插件后才能访问界面。

# 进入容器内部
docker exec -it rabbitmq bash
# 启动插件,启动管理界面
rabbitmq-plugins enable rabbitmq_management

Python操作RabbitMq

安装依赖库pika

pip install pika

获取RabbitMq连接,基本配置

# 连接RabbitMq
connect = pika.BlockingConnection(pika.URLParameters("amqp://${user}:${password}@${ip}:${port}/${vhost}"))
# 创建通讯信道
channel = connect.channel()
# 创建(声明)消息队列,该方法在没有队列时会主动创建队列
channel.queue_declare(queue=${queue_name})# 关闭RabbitMq连接
channel.close()
connect.close()

发布者模式(发送数据)

channel.basic_publish(exchange="", routing_key=${queue_name}, body=f"你好 World!".encode(encoding="UTF-8"))
  • exchange: 在RabbitMq中,发布者不会直接将信息推送到队列中,而是而是先将消息投递到exchange中,在由exchange转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。消息队列通过在在消费者和生产者中引入了exchange的概念,当压力增长的情况下,可以通过配置exchange在不停机的情况下调整系统资源,缓解服务压力
    • exchange为空表示设置为简单exchange模式
  • routing_key: 传送到的队列名称
  • body: 消息内容,在RabbitMq中,通过流的方式传输信息,所以需要对传输内容进行编码

发布者模式完整示例

import pikaconnect = pika.BlockingConnection(pika.URLParameters("amqp://${user}:${password}@${ip}:${port}/${vhost}"))
channel = connect.channel()
channel.queue_declare(queue=${queue_name})channel.basic_publish(exchange="", routing_key=${queue_name}, body=f"你好 World!".encode(encoding="UTF-8"))channel.close()
connect.close()

消费者模式(接收数据)

def callback(channel, method, properties, body: bytes):"""自定义回调函数:param channel: BlockingChannel:param method: spec.Basic.Deliver:param properties: spec.BasicProperties:param body: bytes"""print(f"消费者接收数据: {body.decode('UTF-8')}")# 绑定消息队列,接收数据
channel.basic_consume(queue=${queue_name}, auto_ack=True, on_message_callback=callback)# 阻塞函数,启动后会一直运行等待发布者发送数据
channel.start_consuming()
  • auto_ack: 自动确定配置,通过ACK机制(消息确认机制)确认消息是否被正确接收,确保消息的正确传输,不会丢失

消费者模式完整示例

import pikaconnect = pika.BlockingConnection(pika.URLParameters("amqp://${user}:${password}@${ip}:${port}/${vhost}"))
channel = connect.channel()
channel.queue_declare(queue=${queue_name})def callback(channel, method, properties, body: bytes):"""自定义回调函数:param channel: BlockingChannel:param method: spec.Basic.Deliver:param properties: spec.BasicProperties:param body: bytes"""print(f"消费者接收数据: {body.decode('UTF-8')}")# 绑定消息队列,接收数据
channel.basic_consume(queue=${queue_name}, auto_ack=True, on_message_callback=callback)# 阻塞函数,启动后会一直运行等待发布者发送数据
channel.start_consuming()
  • 设置为交换模式的情况下,一个发布者对应一个消费者。当在RabbitMq中注册多个消费者时,消息队列中的数据将会以轮训的方式发送到消费者中。

http://www.ppmy.cn/news/873328.html

相关文章

联想ERP项目实施案例分析(1):背景

联想ERP项目实施案例分析(1):背景 一、需求触发 1、业务触发决心上线ERP: 1.1、曾经联想在信息系统不完善的情况下,出过上千万生产小料成本遗忘计算的事件。而更早一些的1996年,联想内部物料会议上,杨元庆为发现自己仓库中还有1994年进的486/SX/25CPU大发雷霆。为此,公…

暑期实习的总结感悟+深圳找房

最开始是nlp算法为主,后续是java后端和算法混投,准备分为两个part来讲,找实习的时间,面试的经历。 实习大约从2月底就开始陆陆续续有消息了,包括阿里,网易互娱和雷火的提前批,早投永远要好于晚…

联想高性能服务器,Lenovo|EMC推出高性能4x4TB服务器级NAS

作为联想PC战略的延伸和全球企业级战略的重要组成部分,联想与EMC建立起全球范围内的战略合作,并于2013年4月2日在深圳全新推出面向企业市场的联合品牌存储新产品家族。可以说联想与EMC合作是在企业级业务领域的又一重要战略布局。 在4月2日的发布会上&am…

科技巨头联想入局造车_爱普搜汽车

在宣称“不造车”的华为开始涉足新能源智能汽车赛道后,一样说“不造车”的联想也按捺不住了。 近日,联想官方公开了一系列汽车业务岗位,开始涉足新能源智能汽车领域,引发外界广泛关注。在阿里、华为、百度、小米等众多科技厂商纷纷…

杨元庆一语道出联想衰落的根源

杨元庆一语道出联想衰落的根源(转) 深圳IT领袖峰会上,在回答吴鹰的问题时,杨元庆说了一句话:“从本质上我们是一个产品导向的公司”。当我听到这句话的时候,心里就在琢磨,难怪联想会衰落&#x…

SAP走进联想总部

SAP走进联想总部 http://mp.weixin.qq.com/s?__bizMzI1NDEzNzI4OA&mid2651639282&idx1&sn5e9a9139b2760a4ea66d733324b15fb4&chksmf231ab55c54622435e90c6e689f6e42d17ff725dc591adb068c66e4be8d9da98815507335905&mpshare1&scene5&srcid1208A1Y…

联想云计算战略

本文讲的是联想云计算战略,【IT168 资讯】云计算从根本来讲就是怎么把计算资源、存储资源在全社会层面最优化,这是最根本的。  具体来讲,其实基于两个层面,我们感觉到云计算对终端影响最大的例子。  第一个例子,就…

联想+android电视,骁龙跨界 首款Android 4.0联想电视上市

2012年5月8日,联想集团在北京举行发布会,面向中国市场正式推出首批K系列的四款智能电视。继个人电脑、智能手机和平板电脑之后,智能电视的正式上市,标志着联想PC战略四屏产品布局的完成。 图:联想智能电视正式发布 联想…