使用RabbitMQ实现微服务间的异步消息传递

ops/2024/11/2 16:13:39/

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='fanout_exchange', queue=queue_name)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuebinding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='ack_queue')channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。


http://www.ppmy.cn/ops/130467.html

相关文章

合并文件命令

windows 1、 在电脑左下角搜索行输入cmd进入命令提示符,也就是终端。 2、 若代码在G盘,则输入g: 3、逐层通过cd 文件夹名进入.log文件所在的文件夹。 4、win10:输入type *.log >>G:hebing.txt,则可实现将上一步cd进的文件夹中的.log文…

【解决方案】Mac上禁止chrome自动更新的三种方法

【目的需求】 新版chrome直接用打印机打印页面时,打印任务总是响一下就消失了,使用safari浏览器无此问题,使用早期版本chrome也没有这一问题。因此想固定chrome版本,不要自动更新。尝试了网上的多种方法均失败。 【解决方案】 …

https和http的区别,及HTTPS的工作流程

HTTP(HyperText Transfer Protocol)和HTTPS(HyperText Transfer Protocol Secure)都是超文本传输协议,但它们之间的关键区别在于安全性。 安全性: HTTP:数据以明文传输,没有加密&…

【Nginx】编译安装(Centos)

下载编译包 https://nginx.org/download/nginx-1.24.0.tar.gz 解压: tar -zxvf nginx-1.24.0.tar.gz 进入目录: nginx-1.24.0 配置 ./configure --with-http_ssl_module --with-http_v2_module --with-http_gzip_static_module 如果不加扩展模块就直接执行: …

七、Go语言快速入门之函数func

文章目录 函数:one: GO语言函数介绍:two: 函数的参数和返回值:star2: 按值传递和按引用传递:star2: 给返回值命名:star2: 空白符:star2: 改变外部变量 :three: 传递变长参数:four: defer和追踪:star2: defer使用:star2: defer实现代码追踪 :five: 递归函数:six: 匿名函数(闭包)…

服务攻防之开发组件安全

我们来了解两个比较火的开发组件的安全问题,一个是log4j,一个是fastjson。我们从它的原理到复现来对他进行学习!这篇文章我们先来学习一下log4j! Log4j2远程命令执行(CVE-2021-44228) 1、什么是 log4j 和…

vscode和pycharm在当前工作目录的不同|python获取当前文件目录和当前工作目录

问题背景 相信大家都遇到过一个问题:一个项目在vscode(或pycharm)明明可以正常运行,但当在pycharm(或vscode)中时,却经常会出现路径错误。起初,对于这个问题,我也是一知…

[论文阅读]Detecting Pretraining Data from Large Language Models

Detecting Pretraining Data from Large Language Models http://arxiv.org/abs/2310.16789 这篇文章正式提出了Min-k%方法来实现成员推理攻击 贡献 介绍了WIKIMIA动态基准测试。旨在定期自动评估任何新发布的预训练 LLMs。通过利用 Wikipedia 数据时间戳和模型发布日期&am…