RabbitMQ-发布/订阅模式

server/2024/10/16 0:16:25/

1、发布/订阅模式介绍

在普通的生产者、消费者模式,rabbitmq会将消息依次传递给每一个消费者,一个worker一个,平均分配,这就是Round-robin调度方式,为了实现更加复杂的调度,我们就需要使用发布/订阅的方式。

2、交换机(exchange)

RabbitMQ中,消息模型的核心理念就是,生产者从来不能直接将消息发送到队列,甚至生产者都不知道消息要被发送到队列中。

相反,生产者只能将消息发送到交换机中,交换机一侧从生产者接收消息,一侧将消息发送到队列中,交换机需要知道如何处理接收到的消息,是发送给一个队列还是多个队列?这是由交换机的类型决定的。

交换机共分为四类:  directtopicheaders and fanout. 本章节以扇形交换机为例说明rabbitmq的使用。

3、fanout交换机的使用方式

扇形交换机,就像你猜测的那样,他可以将他接收到的全部消息广播到所有队列里。

3.1 声明交换机

首先声明一个扇形交换机,type参数设置为『fanout』

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

3.2 发送消息到交换机

交换机设定完成后,就可以往该交换机发送消息:

	body := "Hello World!"err = ch.Publish("logs", "", false, false, amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

如果要在rabbitmq的页面上查看发送的消息,需要提前创建一个队列,并绑定到该交换机[logs]上,就可以查看发送的消息:

扇形交换机的特性,就是他会将收到的消息广播给所有绑定到该交换机的队列,我们可以创建多个队列,并绑定到该交换机上,我们发送一次消息,就会看到,所有绑定到该交换机的队列中都会有一条消息,先创建三个队列,并分别绑定到logs交换机:

之后运行脚本,发送两次消息:

 可以看到,三个队列当中都有两条消息。

3.2 扇形交换机发送消息代码

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)if err != nil {fmt.Println("Failed to declare an exchange")return}body := "Hello World!"err = ch.Publish("logs", "", false, false, amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})if err != nil {fmt.Println("Failed to publish a message")return}
}

 3.2 声明队列,用于接收消息

	q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)

声明队列时,没有指定队列名称,这时,系统会返回一个随机名称存储在q变量中。 

3.3 binding

队列声明完成后,需要将该队列绑定到交换机上,这样交换机才能把消息广播给该队列:

绑定代码: 

    err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)

消费者侧全部代码如下:

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)if err != nil {fmt.Println("Failed to declare an exchange")return}q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)var forever chan struct{}go func() {for d := range msgs {fmt.Printf(" [x] %s\n", d.Body)}}()fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

程序启动后,控制台上会增加一个随机命名的队列。

 运行【3.2】的生产者程序,发送消息到扇形交换机,这个时候消费者就会同步消费到消息,并进行打印:

4、总结

关于扇形交换机,核心的一点需要我们记住,发送到扇形交换机的消息,他会将消息广播给所有绑定到该交换机的队列上,无脑广播,所有队列会同时接受到交换机上全部的消息。


http://www.ppmy.cn/server/46026.html

相关文章

Flink搭建

目录 一、standalone模式 二、Flink on Yarn模式 一、standalone模式 解压安装Flink [rootbigdata1 software]# tar -zxvf flink-1.14.0-bin-scala_2.12.tgz -C /opt/module/ [rootbigdata1 module]# mv flink-1.14.0/ flink-standalone 2.进入conf修改flink-conf.yaml job…

pyqt Qtreeview分层控件

pyqt Qtreeview分层控件 介绍效果代码 介绍 QTreeView 是 PyQt中的一个控件&#xff0c;它用于展示分层数据&#xff0c;如目录结构、文件系统等。QTreeView 通常与模型&#xff08;如 QStandardItemModel、QFileSystemModel 或自定义模型&#xff09;一起使用&#xff0c;以管…

实战15:bert 命名实体识别、地址解析、人名电话地址抽取系统-完整代码数据

直接看项目视频演示: bert 命名实体识别、关系抽取、人物抽取、地址解析、人名电话地址提取系统-完整代码数据_哔哩哔哩_bilibili 项目演示: 代码: import re from transformers import BertTokenizer, BertForTokenClassification, pipeline import os import torch im…

代码随想录算法训练营Day 56| 动态规划part17 | 647. 回文子串、5. 最长回文子串、516.最长回文子序列

代码随想录算法训练营Day 56| 动态规划part17 | 647. 回文子串、5. 最长回文子串、516.最长回文子序列 文章目录 代码随想录算法训练营Day 56| 动态规划part17 | 647. 回文子串、5. 最长回文子串、516.最长回文子序列647. 回文子串一、法一 5. 最长回文子串一、动态规划 516.最…

单片机的自动化编程语言:深度探索与未来展望

单片机的自动化编程语言&#xff1a;深度探索与未来展望 单片机作为现代电子设备的核心控制单元&#xff0c;其自动化编程语言的发展与应用&#xff0c;对提升设备性能、简化编程流程具有重大意义。本文将从四个方面、五个方面、六个方面和七个方面&#xff0c;对单片机的自动…

AI新纪元:OpenAI GPT-4o模型发布,开启智能交互革命!

目录 前言一、 总体概述二、能力探索1、文字生成图片2、3D 物体合成3、音频提炼总结4、视频讲座总结 三、 模型评估1、文本评估2、音频ASR评估3、音频翻译性能4、M3Exam零样本结果5、视觉理解评估 四、 OpenAI API使用1、文本聊天2、图像解析3、上传 Base 64 编码图像4、多幅图…

系统安全及其应用

系统安全及其应用 部署服务器的初始化步骤&#xff1a; 1、配置IP地址&#xff0c;网关&#xff0c;DNS解析 2、安装源&#xff0c;外网&#xff08;在线即可yum&#xff09; 内网&#xff08;只能用源码包编译安装&#xff09; 3、磁盘分区 lvm raid 4、系统权限配置和基础安…

fpga系列 HDL: 05 阻塞赋值(=)与非阻塞赋值(<=)

在Verilog硬件描述语言&#xff08;HDL&#xff09;中&#xff0c;信号的赋值方式主要分为两种&#xff1a;连续赋值和过程赋值。每种赋值方式有其独特的用途和语法&#xff0c;并适用于不同类型的电路描述。 1. 连续赋值&#xff08;Continuous Assignment,assign 和&#xf…