RabbitMQ 入门系列(12)— 交换器分类(direct、fanout、topic)

news/2025/2/19 17:02:16/

RabbitMQExchange(交换器)分为四类:

  • direct(默认)
  • headers
  • fanout
  • topic

其中 headers 交换器允许你匹配 AMQP 消息的 header 而非路由键,除此之外 headers 交换器和 direct 交换器完全一致,但性能却很差,几乎用不到,所以我们本文也不做讲解。

注意: fanouttopic 交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。

1、direct 交换器

direct 为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列,如图:
direct

使用代码:

channel.basicPublish("", QueueName, null, message)

推送 direct 交换器消息到对于的队列,空字符为默认的 direct交换器,用队列名称当做路由键。

1.1 代码示例

发送端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列
//【参数说明:
// 参数一:队列名称,
// 参数二:是否持久化;
// 参数三:是否独占模式;
// 参数四:消费者断开连接时是否删除队列;
// 参数五:消息其他参数】
channel.queueDeclare(config.QueueName, false, false, false, null);
String message = String.format("当前时间:%s", new Date().getTime());
// 推送内容
//【参数说明:
// 参数一:交换机名称;
// 参数二:队列名称,
// 参数三:消息的其他属性-路由的headers信息;
// 参数四:消息主体】
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));

接收端,持续接收消息:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列
//【参数说明:
// 参数一:队列名称,
// 参数二:是否持久化;
// 参数三:是否独占模式;
// 参数四:消费者断开连接时是否删除队列;
// 参数五:消息其他参数】
channel.queueDeclare(config.QueueName, false, false, false, null);
Consumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "utf-8"); // 消息正文System.out.println("收到消息 => " + message);channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;//参数二:是否批量应答,true批量确认小于当前id的消息】}
};
channel.basicConsume(config.QueueName, false, "", defaultConsumer);

接收端,获取单条消息

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息确认

持续消息获取使用:basic.consume;单个消息获取使用:basic.get

1.2 注意事项

不能使用 for 循环单个消息消费来替代持续消息消费,因为这样性能很低;

1.3 公平调度

当接收端订阅者有多个的时候,direct 会轮询公平的分发给每个订阅者(订阅者消息确认正常),如图:
公平

1.4 消息的发后既忘特性

发后既忘模式是指接受者不知道消息的来源,如果想要指定消息的发送者,需要包含在发送内容里面,这点就像我们在信件里面注明自己的姓名一样,只有这样才能知道发送者是谁。

1.5 消息确认

看了上面的代码我们可以知道,消息接收到之后必须使用 channel.basicAck() 方法手动确认(非自动确认删除模式下),那么问题来了。

消息收到未确认会怎么样?

如果应用程序接收了消息,因为 bug 忘记确认接收的话,消息在队列的状态会从 Ready 变为 Unacked ,如图:

unack

如果消息收到却未确认,Rabbit 将不会再给这个应用程序发送更多的消息了,这是因为 Rabbit 认为你没有准备好接收下一条消息。

此条消息会一直保持 Unacked 的状态,直到你确认了消息,或者断开与 Rabbit 的连接,Rabbit 会自动把消息改完 Ready 状态,分发给其他订阅者。当然你可以利用这一点,让你的程序延迟确认该消息,直到你的程序处理完相应的业务逻辑,这样可以有效的防治 Rabbit 给你过多的消息,导致程序崩溃。

消费者消费的每条消息都必须确认。

1.6 消息拒绝

消息在确认之前,可以有两个选择:

  • 断开与 Rabbit 的连接,这样 Rabbit会重新把消息分派给另一个消费者;
  • 拒绝 Rabbit 发送的消息使用
channel.basicReject(long deliveryTag, boolean requeue)

参数 1:消息的 id;参数 2:处理消息的方式,如果是 trueRabbib 会重新分配这个消息给其他订阅者,如果设置成 false的话,Rabbit 会把消息发送到一个特殊的 “ 死信 ” 队列,用来存放被拒绝而不重新放入队列的消息。

消息拒绝Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒绝

2. fanout 交换器 —— 发布 / 订阅模式

fanout 有别于 direct 交换器,fanout 是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。

比如用户上传了自己的头像,这个时候图片需要清除缓存,同时用户应该得到积分奖励,你可以把这两个队列绑定到图片上传的交换器上,这样当有第三个、第四个上传完图片需要处理的需求的时候,原来的代码可以不变,只需要添加一个订阅消息即可,这样发送方和消费者的代码完全解耦,并可以轻而易举的添加新功能了。

direct 交换器不同,我们在发送消息的时候新增

channel.exchangeDeclare(ExchangeName, "fanout")

这行代码声明 fanout 交换器。

发送端:

final String ExchangeName = "fanoutec"; // 交换器名称
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器
String message = "时间:" + new Date().getTime();
channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));

接受消息不同于 direct,我们需要声明 fanout 路由器,并使用默认的队列绑定到 fanout 交换器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器
String queueName = channel.queueDeclare().getQueue(); // 声明队列
channel.queueBind(queueName, ExchangeName, "");
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");}
};
channel.basicConsume(queueName, true, consumer);

fanoutdirect 的区别最多的在接收端,fanout 需要绑定队列到对应的交换器用于订阅消息。其中 channel.queueDeclare().getQueue( ) 为随机队列,Rabbit 会随机生成队列名称,一旦消费者断开连接,该队列会自动删除。

注意:对于 fanout 交换器来说 routingKey(路由键)是无效的,这个参数是被忽略的。

3. topic 交换器 —— 匹配订阅模式

topic 交换器运行和 fanout 类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候 routingKey 路由键就排上用场了,使用路由键进行消息(规则)匹配。

假设我们现在有一个日志系统,会把所有日志级别的日志发送到交换器,warninglogerrorfatal,但我们只想处理 error 以上的日志,要怎么处理?这就需要使用 topic 路由器了。

topic 路由器的关键在于定义路由键,定义 routingKey 名称不能超过 255 字节,使用 . 作为分隔符,例如:com.mq.rabbit.error

消费消息的时候 routingKey 可以使用下面字符匹配消息:

  • * 匹配一个分段(用 . 分割)的内容;
  • # 匹配 0 和多个字符;

例如发布了一个 com.mq.rabbit.error 的消息:

能匹配上的路由键:

  • com.mq.rabbit.*
  • com.mq.rabbit.#
  • #.error
  • com.mq.#

不能匹配上的路由键:

  • com.mq.*
  • *.error

所以如果想要订阅所有消息,可以使用 #匹配。

注意:fanouttopic 交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。

发布端:

String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 声明topic交换器
String message = "时间:" + new Date().getTime();
channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 声明topic交换器
String queueName = channel.queueDeclare().getQueue(); // 声明队列
String routingKey = "#.error";
channel.queueBind(queueName, ExchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(routingKey + "|接收消息 => " + message);}
};
channel.basicConsume(queueName, true, consumer);

参考
https://gitbook.cn/books/5d65124b2b27dd24ed390665/index.html


http://www.ppmy.cn/news/603057.html

相关文章

libavcodec-ffmpeg.so.56 cannot open shared object file

1. 问题现象 ImportError: libavcodec-ffmpeg.so.56: cannot open shared object file: No such file or directory2. 解决方法 sudo apt-get install libavcodec-dev sudo apt-get install libavformat-dev sudo apt-get install libswscale-dev

查找数组中唯一重复的元素和查找数组中丢失的数(哈希法和异或法)

1. 问题 假设数组元素为分别为 1-N,一共 N1 个,其中只有一个元素重复出现,其它元素只出现一个,找出这个重复的元素 2. 思路 2.1 使用 Hash 方法 将出现的元素依次放到一个 hash 表中,每个元素值作为 hash 的 key &am…

查找数组元素最大值和最小值(分治法)

1. 问题 给定一个数组,要求找出数组中的最大值和最小值,假设数组中的值两两各不相同 2. 思路 2.1 首元素比较法 定义变量 max、min , 分别将第一个元素分别赋值给这两个变量,然后依次遍历数组中的全部元素,如果比 ma…

超级实用的思维导图软件

如果你正在寻找一款超级实用的思维导图软件,那么我强烈推荐你使用ProcessOn。这款软件不仅功能强大,而且易于使用,可以帮助你更好地组织和管理工作流程、学习笔记、项目管理等。 首先,让我们来看看ProcessOn的优点。它提供了丰富的…

数组按位置旋转(循环移位)

1. 问题 给定一个数组,按照给定的索引位置旋转,如数组 a [1,2,3,4,5,6,7,8,9],按照给定索引位置 4 翻转后,输出为 [6,7,8,9,1,2,3,4,5] 2. 思路 2.1 可以直接利用切片索引性质求解 a [1,2,3,4,5,6,7,8,9],先计算出…

对有大量重复数字的数组进行排序(哈希表应用)

1. 问题 给定一个数组,已知这个数组中有大量的重复数字,如何对这个数组进行高效地排序 2. 思路 常规排序法没有用到大量重复数字这个特性,应该想到用 hash 表来解决这个问题 3. 代码实现 package mainimport ("fmt""sort&q…

查找数组中出现奇数次的 2 个数(哈希法和异或法)

1. 问题 数组中有 N2 个数,其中, N 个数出现了偶数次,2 个数出现了奇数次(这两个数不相等),使用 O(1) 的空间复杂度,找出这个数。不需要知道具体位置,只需要找到这两个数即可。 2. 思…

从三个有序数组中找出它们的公共元素

1. 问题 给定以非递减顺序排序的三个数组,找出这三个数组中的所有公共元素,例如下面三个数组 s1[2,5,12,20,45,85], s2[16,19,20,85,200],s3[3,4,15,20,39,72,85,190] ,那么这三个数组的公共元素为 [20, 85] 2. 思路代…