RabbitMQ系列学习笔记(八)--发布订阅模式

ops/2024/10/23 21:26:43/

文章目录

  • 一、发布订阅模式原理
  • 二、发布订阅模式实战
    • 1、消费者代码
    • 2、生产者代码
    • 3、查看运行结果

本文参考:
尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq
RabbitMQ 详解
Centos7环境安装Erlang、RabbitMQ详细过程(配图)

一、发布订阅模式原理

image.png
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe),其工作原理如下:

  • 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的 每个队列 中。
  • 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。
  • 发布订阅模式使用 fanout 交换机。

Fanout这种类型非常简单。它是将接收到的所有消息广播到它知道的所有队列中。在系统中可以查看到默认的一些exchange类型,其中就包括fanout类型交换机。
image.png

二、发布订阅模式实战

1、消费者代码

在发布订阅模式下,需要使用fanout类型的交换机,可以选择通过channel.exchangeDeclare()创建,指定类型为fanout,并且需要将交换机与队列进行绑定,形成绑定关系,这样生产者在发送消息到交换机以后,fanout交换机才会把该消息广播发送到各个具有绑定关系的队列。
消费者01代码如下:

/*** Description: 发布订阅模式消费者01*/
public class ReceiveLogs01 {//设置要创建的交换机的名称private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//创建fanout交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机是否持久化*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false);/** * 生成一个临时的队列 队列的名称是随机的 * 当消费者断开和该队列的连接时,队列自动删除,防止无用队列占用空间*/ String queueName = channel.queueDeclare().getQueue();//将交换机与队列进行绑定(binding)/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""空串即可*/channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ReceiveLogs01控制台打印接收到的消息: " + message);}});}
}

消费者02代码如下:

/*** Description: 发布订阅模式消费者02*/
public class ReceiveLogs02 {//设置要创建的交换机的名称private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false);String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ReceiveLogs02控制台打印接收到的消息: " + message);}});}
}

2、生产者代码

由于在消费者中已经完成交换机声明,队列创建及二者之间的绑定关系,因此生产者部分的代码较为简单,只需要在发送消息时指定好前面创建的交换机名称即可。

/*** Description: 发布订阅模式生产者*/
public class EmitLog {//交换机名称private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = sc.nextLine();//参数1:指定交换机名称//参数2:指定routingkey,发布订阅模式写""空串即可channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}//关闭资源channel.close();}
}

3、查看运行结果

将ReceiveLogs01和ReceiveLogs02启动,等待接收消息,再启动生产者,通过控制台发送消息。
image.png
消息发送完毕以后,查看两个消费者都接收到了同样的消息,类似广播,而非之前的互斥接收。
image.png
image.png


http://www.ppmy.cn/ops/127924.html

相关文章

基于Redis的字符串来进行营业状态的存储

简介:苍穹外卖p63-p65;Redis配置类见本人博客:一文搞懂Redis所有知识点 管理端Controller类 package com.sky.controller.admin;import com.sky.result.Result; import io.swagger.annotations.Api; import io.swagger.annotations.ApiO…

10月22日,每日信息差

第一、北京京能氢安科技有限公司近日成立,法定代表人为刘毅,注册资本 4800 万元。该公司由北京京能科技有限公司全资持股,后者是北京能源集团有限责任公司的全资子公司。公司经营范围包括站用加氢及储氢设施销售、储能技术服务、新兴能源技术…

安装Python及pip使用方法详解

一、安装Python Python是一种广泛使用的高级编程语言,其安装过程相对简单。以下是具体步骤: 访问Python官网: 打开浏览器,访问Python的官方网站[python.org](https://www.python.org/),确保下载的是最新版本的Python安…

面试题:在 React 中如何绑定事件

在 React 中绑定事件处理器(event handlers)是一个常见的任务,通常涉及以下几个步骤: 定义一个事件处理器函数:在组件的类或者函数组件内部定义一个处理事件的函数。 在 JSX 中绑定事件处理器:在渲染 JSX 时,使用 on 前缀加上事件名称(如 onClick, onChange, onSubmit …

LeetCode Hot100 | Day6 | 从前序和中序数组构建二叉树

LeetCode Hot100 | Day6 | 从前序和中序数组构建二叉树 从前序和中序数组构建二叉树 105. 从前序与中序遍历序列构造二叉树 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:TreeNode *tra(vector<int> preorder, vector<int> inorder){if(pre…

kotlin 入门总结

目录 1、构造函数 2、数据类 data class&#xff0c; 3、object 单例类&#xff0c;相当于java线程安全的懒加载 4、companion object 伴生对象&#xff0c;类似于包装静态值的一个区域块 5、解构 6、空安全 7、条件语句 8、集合 9 属性和支持属性 属性 支持属性 10 …

9. JSON RPC 服务

① JSON RPC 是一种基于 JSON 格式的轻量级的 RPC 协议标准,易于使用和阅读。 ② 在 Hyperf 里由 hyperf/json-rpc 组件来实现,可自定义基于 HTTP 协议来传输,或直接基于 TCP 协议来传输。 一、服务中心 目前 Hyperf 仅支持两种服务中心的组件支持: consul、nacosconsul 安…

python爬虫,爬取网页壁纸图片

python爬虫实战&#xff0c;爬取网页壁纸图片 使用python爬取壁纸图片&#xff0c;保存到本地。 爬取彼岸图网&#xff0c;网站地址https://pic.netbian.com/ 本人小白&#xff0c;记录一下学习过程。 开始前的准备 安装python环境&#xff0c;略。 python编辑器pycharm2…