RabbitMQ系列学习笔记(八)--发布订阅模式

server/2024/10/22 11:18:37/

文章目录

  • 一、发布订阅模式原理
  • 二、发布订阅模式实战
    • 1、消费者代码
    • 2、生产者代码
    • 3、查看运行结果

本文参考:
尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq
RabbitMQ 详解
Centos7环境安装Erlang、RabbitMQ详细过程(配图)

一、发布订阅模式原理

image.png
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe),其工作原理如下:

  • 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的 每个队列 中。
  • 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。
  • 发布订阅模式使用 fanout 交换机。

Fanout这种类型非常简单。它是将接收到的所有消息广播到它知道的所有队列中。在系统中可以查看到默认的一些exchange类型,其中就包括fanout类型交换机。
image.png

二、发布订阅模式实战

1、消费者代码

在发布订阅模式下,需要使用fanout类型的交换机,可以选择通过channel.exchangeDeclare()创建,指定类型为fanout,并且需要将交换机与队列进行绑定,形成绑定关系,这样生产者在发送消息到交换机以后,fanout交换机才会把该消息广播发送到各个具有绑定关系的队列。
消费者01代码如下:

/*** Description: 发布订阅模式消费者01*/
public class ReceiveLogs01 {//设置要创建的交换机的名称private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//创建fanout交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机是否持久化*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false);/** * 生成一个临时的队列 队列的名称是随机的 * 当消费者断开和该队列的连接时,队列自动删除,防止无用队列占用空间*/ String queueName = channel.queueDeclare().getQueue();//将交换机与队列进行绑定(binding)/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""空串即可*/channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ReceiveLogs01控制台打印接收到的消息: " + message);}});}
}

消费者02代码如下:

/*** Description: 发布订阅模式消费者02*/
public class ReceiveLogs02 {//设置要创建的交换机的名称private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false);String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ReceiveLogs02控制台打印接收到的消息: " + message);}});}
}

2、生产者代码

由于在消费者中已经完成交换机声明,队列创建及二者之间的绑定关系,因此生产者部分的代码较为简单,只需要在发送消息时指定好前面创建的交换机名称即可。

/*** Description: 发布订阅模式生产者*/
public class EmitLog {//交换机名称private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = sc.nextLine();//参数1:指定交换机名称//参数2:指定routingkey,发布订阅模式写""空串即可channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}//关闭资源channel.close();}
}

3、查看运行结果

将ReceiveLogs01和ReceiveLogs02启动,等待接收消息,再启动生产者,通过控制台发送消息。
image.png
消息发送完毕以后,查看两个消费者都接收到了同样的消息,类似广播,而非之前的互斥接收。
image.png
image.png


http://www.ppmy.cn/server/133882.html

相关文章

Spring Boot技术:图书进销存管理的创新实践

6系统测试 6.1概念和意义 测试的定义:程序测试是为了发现错误而执行程序的过程。测试(Testing)的任务与目的可以描述为: 目的:发现程序的错误; 任务:通过在计算机上执行程序,暴露程序中潜在的错误。 另一个…

《武汉科技大学学报》

《武汉科技大学学报》 本学报国际刊号为ISSN 1674-3644,国内统一刊号为CN 42-1608/N。 本学报主要刊载冶金工程、冶金材料科学、冶金机械工程及自动化、信息科学与控制、化学工程、计算机科学、建筑工程、环境工程以及基础理论研究等学科的学术论文。择优报道国内…

pandas-数据分析-练习题-第1次练习

文章目录 简介开始练习第一题第二题第三题第四题第五题第六题第七题第八题第九题第十题第十一题 简介 每次更新大概10个左右的关于pandas的操作知识点!做练习要从第一步开始,防止报错!本环境是Anaconda创建的虚拟环境中打开的jupyter noteboo…

【解决】webstrom uniapp rpx格式化空格 报错飘红

解决办法 1、安装 wechat mini program support 插件 2. 设置 wechat mini program 里小程序支持选为启用 3. 重新格式化显示正常&#xff0c;也不飘红了 注意要style开启scss支持lang"scss"&#xff0c;否则也会飘红报错 <style lang"scss"><…

leetcode动态规划(八)-不同的二叉搜索树

题目 96.不同的二叉搜索树 给你一个整数 n &#xff0c;求恰由 n 个节点组成且节点值从 1 到 n 互不相同的 二叉搜索树 有多少种&#xff1f;返回满足题意的二叉搜索树的种数。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;5示例 2&#xff1a; 输入&#xff…

Apache Seata Raft模式配置中心

本文来自 Apache Seata官方文档&#xff0c;欢迎访问官网&#xff0c;查看更多深度文章。 本文来自 Apache Seata官方文档&#xff0c;欢迎访问官网&#xff0c;查看更多深度文章。 Apache Seata Raft模式配置中心 title: Seata Raft模式配置中心 author: 蒋奕晨-清华大学&…

FairGuard游戏加固全面适配纯血鸿蒙NEXT

2024年10月8日&#xff0c;华为正式宣布其原生鸿蒙操作系统 HarmonyOS NEXT 进入公测阶段&#xff0c;标志着其自有生态构建的重要里程碑。 作为游戏安全领域领先的第三方服务商&#xff0c;FairGuard游戏加固在早期就加入了鸿蒙生态的开发&#xff0c;基于多项独家技术与十余年…

3.matplotlib基础及用法(全)

一.基础绘图 折线图plot散点图scatter柱状图bar饼图pie 二.图表设置 设置标题设置线条设置坐标轴添加图例添加注释设置画布大小与分辨率 三.高级功能 绘制子图保存图形 一.基础绘图 1.折线图plot import matplotlib.pyplot as plt x [1, 2, 3, 4, 5] y [2, 3, 5, 7, 11] pl…