RabbitMQ 订阅模型-路由模式

news/2025/1/11 4:23:57/

订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。

在 Direct 模型下:

  • 队列与交换机绑定,不能任意绑定,而要指定一个 RoutingKey (路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 RoutingKey 与消息的 Routing Key完全一致,才会收到消息。

文章目录

    • 一、RabbitMQ 订阅模型-路由(Direct)模式
        • 1、RabbitMQ 路由(direct)模式
        • 2、路由(direct)模式组成
    • 二、RabbitMQ 订阅模型-路由(Direct)模式实现
        • 1、添加 Maven 依赖
        • 2、封装工具类 ConnectionUtil
        • 3、生产者实现
        • 4、消费者-1 实现
        • 5、消费者-2 实现
    • 三、订阅模型 三种模式区别
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、RabbitMQ 路由(direct)模式
        • 3、RabbitMQ 主题(topic)模式


一、RabbitMQ 订阅模型-路由(Direct)模式

1、RabbitMQ 路由(direct)模式

订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。

image-20221201012250811

在 Direct 模型下:

  • 队列与交换机绑定,不能任意绑定,而要指定一个 RoutingKey (路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 RoutingKey 与消息的 Routing Key完全一致,才会收到消息。

2、路由(direct)模式组成

RabbitMQ 订阅模型-路由模式(Fanout)模式主要有以下六个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。

  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)

  • Routing Key 路由关键字,exchange 根据这个关键字进行消息投递,Exchange 接收到的消息会带有 RoutingKey 这个字段,Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 BindingKey 做匹配,如果满足要求,就往 BindingKey 所绑定的 Queue 发送消息,这样我们就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue 的过程

  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成

  • 消费者2(consumer):领取任务并完成…

  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。


二、RabbitMQ 订阅模型-路由(Direct)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency>...
</dependencies>

2、封装工具类 ConnectionUtil

package com.lizhengi.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author liziheng* @version 1.0.0* @description 连接工具类封装* @date 2022-12-26 11:39 上午**/
public class ConnectionUtil {/*** 创建 MQ 连接工厂对象*/private static final ConnectionFactory CONNECTION_FACTORY;// 类加载时,重量级资源加载static {CONNECTION_FACTORY = new ConnectionFactory();//设置连接MQ主机CONNECTION_FACTORY.setHost("127.0.0.1");//设置连接MQ端口号CONNECTION_FACTORY.setPort(5672);//设置连接MQ虚拟主机CONNECTION_FACTORY.setVirtualHost("/test");//设置连接MQ虚拟主机的用户名密码CONNECTION_FACTORY.setUsername("admin");CONNECTION_FACTORY.setPassword("123456");}/*** 建立与RabbitMQ连接 工具方法** @return Connection*/public static Connection getConnection() {try {//返回连接对象return CONNECTION_FACTORY.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 关闭通道和连接 工具方法** @param channel    Channel* @param connection Connection*/public static void closeConnectionAndChanel(Channel channel, Connection connection) {try {if (channel != null) {channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {e.printStackTrace();}}
}

3、生产者实现

package com.lizhengi.direct;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-路由模式(Direct)模式 生产者* @date 2022-12-26 3:43 下午**/
public class Producer {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();/*  为通道声明交换机*  参数1:交换机名称;参数2:交换机类型*/channel.exchangeDeclare("logs_direct", "direct");//发送消息String routingKey = "error";
//        String routingKey = "info";channel.basicPublish("logs_direct", routingKey, null, ("The message's routingKey is " + routingKey + " !").getBytes());//释放资源ConnectionUtil.closeConnectionAndChanel(channel, connection);}
}

4、消费者-1 实现

package com.lizhengi.direct;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-路由(Direct)模式 消费者* @date 2022-12-26 3:46 下午**/public class Customer1 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs_direct", "direct");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列和routingKey//  参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由keychannel.queueBind(queueName, "logs_direct", "error");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者1:" + new String(body));}});}
}

5、消费者-2 实现

package com.lizhengi.direct;import com.rabbitmq.client.*;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description 订阅模型-路由(Direct)模式 消费者* @date 2022-12-26 3:47 下午**/
public class Customer2 {public static void main(String[] args) throws IOException {// 获取连接对象Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();//  绑定交换机channel.exchangeDeclare("logs_direct", "direct");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列和routingKey//  参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由keychannel.queueBind(queueName, "logs_direct", "info");channel.queueBind(queueName, "logs_direct", "error");channel.queueBind(queueName, "logs_direct", "warning");//  消费消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("消费者1:" + new String(body));}});}
}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。


http://www.ppmy.cn/news/7279.html

相关文章

RocketMQ原理篇

文章目录broker与NameServerMessageQueue与Topic的关系生产者、消费者写入读取 消息CommitLog生产者消费者组broker与NameServer 基于 Dledger 实现 RocketMQ 高可用自动切换 broker 会每隔 30 秒向 NameServer 发送一个的心跳 &#xff0c;NameServer 收到一个心跳 会更新对…

元素偏移量 offset、元素可视区 client和元素滚动 scroll

1、元素偏移量 offset 系列 1.1、offset 概述 offset 翻译过来就是偏移量&#xff0c; 我们使用 offset系列相关属性可以动态的得到该元素的位置&#xff08;偏移&#xff09;、大小等。 获得元素距离带有定位父元素的位置获得元素自身的大小&#xff08;宽度高度&#xff09…

SegeX MemDC:实用型双缓冲内存DC (内存DC 封装MemDC)(附免费源代码)

----哆啦刘小洋 原创&#xff0c;转载需说明出处 2022-12-28 SegeX MemDC1 简介2 基础双缓存技术2.1 MFC绘图机制2.1.1 Window绘图消息2.1.2 背景刷新与屏幕闪烁2.2 双缓存技术消除屏幕闪烁2.3 封装3 更加实用的扩充1 简介 在VC中用MFC绘制图像时&#xff0c;为避免屏幕闪烁&a…

【Qt】QtCreator远程部署、调试程序

1、添加远程设备 1)QtCreator 工具–> 选项 --> 设备 --> 添加 2)设备设置向导选择–> Generic Linux Device --> 开启向导 3)填写“标识配置的名称”(随便写)、设备IP、用户名 --> 下一步 4)选择配对秘密文件,第一次配对,可以不填写,点击“下一…

Shell程序编写猜数字的小游戏

文章目录 目录 文章目录 前言 一、设计思路 二、代码编写 三、效果图 总结 前言 在学习Linux课程中学习了一点简单的shell语法&#xff0c;实现了一个猜数字功能的程序。感兴趣的可以看完后自己手动编写玩玩~这个小游戏的编写也是把基础的shell语法基本上都用到了&#…

学习HTTP协议,这一篇就够啦 ~~

HTTP协议一、什么是HTTP1.1 应用层协议1.2 HTTP1.3 HTTP协议的工作过程二、HTTP协议格式2.1 Fiddler抓包工具2.2 协议格式三、HTTP请求 (Request)3.1 认识 "方法" (method)3.1.1 GET 方法3.1.2 POST 方法3.1.3 GET和POST比较3.1.4 其他方法3.2 认识URL3.2.1 URL基本格…

【电商】电商后台---价税管理

文章对电商后台系统中的价税管理进行了系统的介绍&#xff0c;希望通过此文能够加深你对电商系统的认识。 前面介绍了商品管理部分&#xff0c;从商品的属性、分类到商品资质、商品图片都做了说明&#xff0c;在梳理的过程中越发的感觉到每部分细节才是关键。但实话实说通过前几…

C#,图像二值化(05)——全局阈值的联高自适应算法及其源代码

阈值的选择当然希望智能、简单一些。应该能应付一般的图片。 What is Binarization? Binarization is the process of transforming data features of any entity into vectors of binary numbers to make classifier algorithms more efficient. In a simple example, trans…