RabbitMQ中的Topic模式

ops/2024/12/26 17:00:12/

在现代分布式系统中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个广泛使用的开源消息代理,支持多种消息传递模式,其中 Topic 模式 是一种灵活且强大的模式,允许生产者和消费者通过通配符匹配的方式进行消息传递。本文将深入探讨 RabbitMQ 中 Topic 模式的工作原理,并通过 Java 代码示例展示其实现方式。


1. Topic 模式的工作原理

1.1 Topic 模式概述

在 RabbitMQ 中,Topic 模式是基于 交换机类型为 topic 的一种消息传递模式。与 Direct 模式(精确匹配)和 Fanout 模式(广播)不同,Topic 模式允许生产者发送消息到特定的交换机,并根据消息的 路由键(Routing Key)绑定键(Binding Key) 的匹配规则,将消息分发到相应的队列。

在这里插入图片描述

1.2 关键概念

1.2.1 交换机(Exchange)

在 Topic 模式中,消息不会直接发送到队列,而是发送到一个 topic 类型的交换机。交换机根据消息的路由键和队列的绑定键进行匹配,决定将消息分发到哪些队列。

1.2.2 路由键(Routing Key)

路由键是生产者在发送消息时指定的字符串,用于描述消息的主题或类别。路由键通常由多个单词组成,单词之间用点号(.)分隔,例如:user.logs.info

1.2.3 绑定键(Binding Key)

绑定键是消费者在绑定队列到交换机时指定的字符串,用于描述队列感兴趣的主题或类别。绑定键的格式与路由键相同,但支持通配符匹配。

1.2.4 通配符

Topic 模式支持两种通配符:

  • *(星号):匹配一个单词。
  • #(井号):匹配零个或多个单词。

例如:

  • *.logs.*:匹配所有包含 logs 的消息,如 user.logs.infosystem.logs.error
  • #.error:匹配所有以 error 结尾的消息,如 system.logs.erroruser.error

1.3 消息分发流程

  1. 生产者发送消息到 topic 类型的交换机,并指定路由键。
  2. 交换机根据路由键和队列的绑定键进行匹配。
  3. 如果匹配成功,消息会被分发到相应的队列。
  4. 消费者从队列中消费消息。

2. Topic 模式的 Java 代码实现

下面通过一个简单的 Java 代码示例,展示如何在 RabbitMQ 中实现 Topic 模式。

2.1 环境准备

在开始之前,请确保已经安装并运行了 RabbitMQ 服务,并且安装了 RabbitMQ 的 Java 客户端库。可以通过 Maven 引入依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

2.2 生产者代码

生产者负责发送消息到 topic 交换机,并指定路由键。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TopicProducer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个 topic 类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 定义路由键和消息内容String routingKey = "user.logs.info"; // 可以修改为其他路由键String message = "This is a log message from user.";// 发送消息到交换机channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "': '" + message + "'");}}
}

2.3 消费者代码

消费者负责从队列中接收消息,并根据绑定键过滤感兴趣的消息。

import com.rabbitmq.client.*;public class TopicConsumer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明一个 topic 类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建一个临时队列,并绑定到交换机String queueName = channel.queueDeclare().getQueue();String bindingKey = "user.#"; // 可以修改为其他绑定键channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建消费者并开始消费消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "': '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

3. 运行示例

3.1 启动 RabbitMQ 服务

确保 RabbitMQ 服务已经启动并运行。如果使用 Docker,可以通过以下命令启动 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

3.2 运行生产者和消费者

  1. 运行 TopicProducer 类,发送消息到交换机。
  2. 运行 TopicConsumer 类,接收并处理消息。

3.3 测试不同的路由键和绑定键

  • 修改生产者的 routingKey,例如:system.logs.error
  • 修改消费者的 bindingKey,例如:#.error*.logs.*

观察消息的分发情况,验证 Topic 模式的通配符匹配功能。

在这里插入图片描述


4. 总结

RabbitMQ 的 Topic 模式通过通配符匹配的方式,提供了灵活的消息分发机制,适用于复杂的场景。通过本文的介绍和代码示例,读者可以深入理解 Topic 模式的工作原理,并掌握如何在 Java 中实现 Topic 模式。

在实际应用中,Topic 模式可以用于日志收集、事件驱动架构等场景,帮助开发者构建高效、可扩展的分布式系统。


http://www.ppmy.cn/ops/145166.html

相关文章

基于Jenkins+Docker的自动化部署实践——整合Git与Python脚本实现远程部署

环境说明&#xff1a; Ubuntu&#xff1a;v24.04.1 LTSJekins&#xff1a;v2.491Docker&#xff1a;v27.4.0Gogs&#xff1a;v0.14.0 - 可选。可以选择Github&#xff0c;Gitlab或者Gitea等Git仓库&#xff0c;不限仓库类型1Panel: v1.10.21-lts - 可选。这里主要用于查看和管…

默认接口实现”在 C# 7.3 中不可用。请使用 8.0 或更高的语言版本报错问题

问题 开发环境&#xff1a; C# .netframework4.5.2, Visualstudio2019&#xff0c;定义了如下接口&#xff0c;在接口中定义了一个委托和事件报错&#xff1a;错误 CS8370 功能“默认接口实现”在 C# 7.3 中不可用。请使用 8.0 或更高的语言版本。 public interface I…

centos server系统新装后的网络配置

当前状态&#xff1a; ping www.baidu.com报错 1、检查IP ip addr show记录要编辑的网卡 link/ether 后的XX:XX:XX:XX:XX:XX号 2、以em1为例&#xff1a; vi /etc/sysconfig/network-scripts/ifcfg-em1&#xff0c;新增如下行&#xff1a; HWADDRXX:XX:XX:XX:XX:XX(具体值…

【医学分割】跨尺度全局状态建模和频率边界指导的分割架构

SkinMamba: A Precision Skin Lesion Segmentation Architecture with Cross-Scale Global State Modeling and Frequency Boundary Guidance 本文提出了一种基于 Mamba 和 CNN 的混合架构&#xff0c;称为 SkinMamba。它在保持线性复杂性的同时&#xff0c;提供了强大的长距离…

软件工程课程知识点

一、软件与软件工程概述 1. 软件的组成与演化 软件的构成 Software(软件) 通常由 computer programs(计算机程序)、data structures(数据结构)、software description information(软件描述信息) 组成&#xff0c;或者说由 set of programs(程序集合)、documentation(文档) …

C++前言

1.什么是C C语言是结构化和模块化的语言&#xff0c;适合处理较小规模的程序&#xff0c;对于复杂的问题&#xff0c;规模较大的程序&#xff0c;需要高度的抽象和建模时&#xff0c;C语言不适合&#xff0c;为了解决软件危机&#xff0c;20世纪80年代&#xff0c;计算机界提出…

在vscode中的ESP-IDF插件中使用Arduino框架作为组件

首先要先安装好ESP-IDF插件&#xff0c;然后进行如下操作 1、安装特定版本ESP-IDF 在ESP-IDF插件中&#xff0c;Advanced->Configure ESP-IDF Extension 选Advanced&#xff0c;Select ESP-IDF version:&#xff0c;选好版本&#xff0c;点Configure Tools&#xff0c;即可…

Echarts之yAxis属性超超超级详情版学习

yAxis 属性说明类型id组件idstringshow是否显示y轴booleanalignTicks在多个 y 轴为数值轴的时候&#xff0c;可以开启该配置项自动对齐刻度。只对value和log类型的轴有效booleanpositiony 轴的位置stringoffsetY 轴相对于默认位置的偏移&#xff0c;在相同的 position 上有多个…