RabbitMQ 路由(Routing)通讯方式详解

server/2024/12/26 4:06:29/

在现代分布式系统中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 作为一个广泛使用的消息代理(Message Broker),提供了多种消息传递模式,其中路由(Routing)模式是一种非常强大且灵活的通讯方式。本文将深入探讨 RabbitMQ 中的路由模式,帮助读者理解其工作原理、应用场景以及如何通过 Java 代码实现。


1. 什么是路由模式?

在 RabbitMQ 中,路由模式是基于 Direct Exchange 的一种消息传递模式。与简单的 Fanout Exchange 不同,Direct Exchange 允许消息发送者根据特定的路由键(Routing Key)将消息发送到特定的队列。这种模式提供了更细粒度的消息分发控制,使得消息可以根据业务需求被精确地路由到目标队列。

在这里插入图片描述

1.1 关键概念

  1. Direct Exchange: 直接交换机,根据消息的路由键将消息路由到绑定到该交换机的队列。
  2. Routing Key: 路由键是消息的一个属性,用于指定消息的目标队列。交换机会根据路由键将消息路由到匹配的队列。
  3. Binding: 绑定是交换机和队列之间的关联关系。在绑定过程中,可以指定一个路由键,交换机会根据这个路由键将消息路由到相应的队列。

2. 路由模式的工作原理

在路由模式中,消息的发送者和接收者通过交换机进行通信。以下是路由模式的工作流程:

  1. 生产者(Producer) 发送消息到 Direct Exchange,并指定一个路由键。
  2. Direct Exchange 根据消息的路由键,将消息路由到与之绑定的队列。
  3. 消费者(Consumer) 从队列中接收消息并进行处理。

2.1 示例场景

假设我们有一个日志系统,需要将不同级别的日志(如 infoerrorwarning)发送到不同的队列,以便不同的消费者处理。我们可以使用路由模式来实现这一需求。

  1. 定义交换机: 创建一个 Direct Exchange,命名为 logs_exchange
  2. 定义队列: 创建三个队列,分别命名为 info_queueerror_queuewarning_queue
  3. 绑定队列: 将 info_queue 绑定到 logs_exchange,并指定路由键为 info;将 error_queue 绑定到 logs_exchange,并指定路由键为 error;将 warning_queue 绑定到 logs_exchange,并指定路由键为 warning
  4. 发送消息: 生产者发送消息到 logs_exchange,并指定路由键为 infoerrorwarning
  5. 接收消息: 消费者从相应的队列中接收消息并进行处理。

3. 路由模式的 Java 实现

以下是一个使用 Java 和 RabbitMQ Java Client 库实现路由模式的示例代码。

3.1 添加依赖

首先,在 pom.xml 中添加 RabbitMQ 的依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

3.2 生产者代码

生产者负责发送消息到 Direct Exchange,并指定路由键。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class DirectProducer {private static final String EXCHANGE_NAME = "routing_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Direct Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 定义路由键和消息String severity01 = "info"; // 可以是 "info", "error", "warning"String message01 = "This is an info message";// 发送消息到交换机,并指定路由键channel.basicPublish(EXCHANGE_NAME, severity01, null, message01.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + severity01 + "':'" + message01 + "'");// 定义路由键和消息String severity02 = "warning"; // 可以是 "info", "error", "warning"String message02 = "This is an info message";// 发送消息到交换机,并指定路由键channel.basicPublish(EXCHANGE_NAME, severity02, null, message02.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + severity02 + "':'" + message02 + "'");}}
}

3.3 消费者代码

消费者负责从队列中接收消息并处理。

3.3.1 消费者DirectConsumer01

  • 接受消息中包含info, error, warning的数据。
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;public class DirectConsumer01 {private static final String EXCHANGE_NAME = "routing_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明 Direct Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到交换机,并指定路由键String[] severities = {"info", "error", "warning"}; // 可以只绑定部分路由键for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义消息处理回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 开始消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

3.3.2 消费者DirectConsumer02

  • 接受消息中包含error, warning的数据,但不接受消息中有info的数据。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class DirectConsumer02 {private static final String EXCHANGE_NAME = "routing_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明 Direct Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到交换机,并指定路由键String[] severities = {"error", "warning"}; // 可以只绑定部分路由键for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义消息处理回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 开始消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

4. 运行示例

  1. 启动 RabbitMQ 服务: 确保 RabbitMQ 服务已启动并运行在 192.168.200.138
  2. 运行消费者: 启动消费者程序,绑定队列到交换机,并等待消息。
  3. 运行生产者: 启动生产者程序,发送带有不同路由键的消息。

4.1 输出示例

4.1.1 生产者输出
 [x] Sent 'info':'This is an info message'[x] Sent 'warning':'This is an info message'
4.1.2 消费者DirectConsumer01输出
 [*] Waiting for messages. To exit press CTRL+C[x] Received 'info':'This is an info message'[x] Received 'warning':'This is an info message'
4.1.3 消费者DirectConsumer02输出
 [*] Waiting for messages. To exit press CTRL+C[x] Received 'warning':'This is an info message'

在这里插入图片描述


5. 路由模式的应用场景

  1. 日志系统: 将不同级别的日志(如 infoerrorwarning)发送到不同的队列,以便不同的消费者处理。
  2. 通知系统: 根据用户的订阅类型(如 emailsmspush)将通知发送到不同的队列。
  3. 任务分发: 根据任务的类型(如 high_prioritylow_priority)将任务分发到不同的队列。

总结

RabbitMQ 的路由模式(Routing)通过 Direct Exchange 提供了灵活的消息分发机制,使得消息可以根据路由键被精确地路由到目标队列。


http://www.ppmy.cn/server/153220.html

相关文章

MFC/C++学习系列之简单记录7

MFC/C学习系列之简单记录7 前言句柄的介绍句柄的使用AFX开头的函数都是干什么用的&#xff1f;总结 前言 在MFC的使用中发现了句柄&#xff0c;今天来详细学习一下MFC中如何使用句柄吧&#xff01; 句柄的介绍 句柄的使用是资源管理和传递的关键机制&#xff0c;通过句柄将系…

在C#中使用反射获取对象的类型信息

在C#中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许在运行时获取有关程序集、模块和类型的元数据&#xff0c;并且能够动态地调用方法和访问字段。通过反射可以获取对象的类型信息&#xff0c;包括其属性、方法、事件等。 以下是如何…

Centos下的OpenSSH服务器和客户端

目录 1、在 IP地址为192.168.98.11的Linux主机上安装OpenSSH服务器&#xff1b; 2、激活OpenSSH服务&#xff0c;并设置开机启动&#xff1b; 3、在IP地址为192.168.98.33的Linux主机上安装OpenSSH客户端&#xff0c;使用客户端命令&#xff08;ssh、scp、sftp&#xff09;访…

gitlab克隆仓库报错fatal: unable to access ‘仓库地址xxxxxxxx‘

首次克隆仓库&#xff0c;失效了&#xff0c;上网查方法&#xff0c;都说是网络代理的问题&#xff0c;各种清理网络代理后都无效&#xff0c;去问同事&#xff1a; 先前都是直接复制的网页url当做远端url&#xff0c;或者点击按钮‘使用http克隆’ 这次对于我来说有效的远端u…

【深度学习】嘿马深度学习笔记第10篇:卷积神经网络,学习目标【附代码文档】

本教程的知识点为&#xff1a;深度学习介绍 1.1 深度学习与机器学习的区别 TensorFlow介绍 2.4 张量 2.4.1 张量(Tensor) 2.4.1.1 张量的类型 TensorFlow介绍 1.2 神经网络基础 1.2.1 Logistic回归 1.2.1.1 Logistic回归 TensorFlow介绍 总结 每日作业 神经网络与tf.keras 1.3 …

力扣——102. 二叉树的层序遍历

给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,20],[15,7]]示例 2&#xff1a; 输入&a…

玩转OCR | 腾讯云智能结构化OCR推动跨行业高效精准的文档处理与数据提取新时代

在数字化转型的浪潮中&#xff0c;光学字符识别&#xff08;OCR&#xff09;技术已成为企业提高效率、降低成本的关键工具。腾讯云智能结构化OCR凭借其先进的技术和广泛的应用场景&#xff0c;正在推动跨行业高效精准的文档处理与数据提取新时代。本文将全面介绍腾讯云智能结构…

Ingress-Nginx Annotations 指南:配置要点全方面解读(下)

文章目录 1.HTTP2 Push Preload2.Server Alias3.Server snippet4.Client Body Buffer Size5.External Authentication6.Global External Authentication7.Rate Limiting8.Global Rate Limiting9.Permanent Redirect10.Permanent Redirect Code11.Temporal Redirect12.SSL Passt…