第七章 RabbitMQ之交换机

ops/2024/10/11 6:25:35/

目录

一、介绍

二、Fanout Exchange

2.1. 案例演示 

2.1.1. 创建SpringBoot工程

2.1.2. 父工程pom依赖

2.1.3. 生产者pom依赖 

 2.1.4. 生产者配置文件

2.1.5. 生产者核心代码

 2.1.6. 消费者RabbitMQConfig

 2.1.7. 消费者pom依赖

2.1.8. 消费者配置文件

2.1.9.  消费者核心代码

2.1.10. 运行效果

三、Direct Exchange

3.1. 案例演示

3.1.1. 生产者核心代码

3.1.2. 消费者RabbitMQConfig

 3.1.3. 消费者核心代码

3.1.4. 运行效果 

四、Topic Exchange

4.1. 案例演示

4.1.1. 生产者核心代码

 4.1.2. 消费者RabbitMQConfig

4.1.3. 消费者核心代码 

4.1.4. 运行结果


一、介绍

在RabbitMQ中,交换机(Exchange)是用来接收生产者发送的消息并将这些消息路由到一个或多个队列的组件。不同类型的交换机决定了它们如何路由这些消息。

以下是RabbitMQ中几种常用交换机类型:

直接交换机(direct):如果路由键完全匹配,消息就被投递到相应的队列。

广播交换机(fanout):消息被广播到所有绑定的队列。

主题交换机(topic):如果路由键匹配模式(通配符#代表匹配多个单词和*代表匹配一个单词),消息被投递到相应的队列。

头交换机(headers):通过匹配消息头进行路由。

二、Fanout Exchange

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式

2.1. 案例演示 

利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定

3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4. 在publisher中编写测试方法,向hmall.fanout发送消息

2.1.1. 创建SpringBoot工程

完整的工程目录结构及代码文件如下:

2.1.2. 父工程pom依赖

引入核心依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mq-demo</name><description>mq-demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!-- spring amqp依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.1.3. 生产者pom依赖 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>publisher</artifactId><version>0.0.1-SNAPSHOT</version><name>publisher</name><description>publisher</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

 2.1.4. 生产者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

2.1.5. 生产者核心代码

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {// 交换机名称String exchangeName = "wzx.fanout";// 消息内容String message = "人生苦短,持续不断地努力拼搏,迎难而上!";rabbitTemplate.convertAndSend(exchangeName,null, message);}
}

 2.1.6. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue fanoutQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("fanout.queue1").build();}@BeanQueue fanoutQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("fanout.queue2").build();}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("wzx.fanout");}@BeanBinding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); // 绑定队列和交换机}@BeanBinding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); // 绑定队列和交换机}
}

 2.1.7. 消费者pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>consumer</name><description>consumer</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.1.8. 消费者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

2.1.9.  消费者核心代码

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "fanout.queue1")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "fanout.queue2")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

2.1.10. 运行效果

我们可以看到,生产者的消息,同时发给了绑定fanout类型交换机的两个队列,两个消费者均收到了同一条消息。

三、Direct Exchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

3.1. 案例演示

我们在2.1示例代码的基础上稍做调整:

3.1.1. 生产者核心代码

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String exchangeName = "wzx.direct";String message = "人生苦短,持续不断地努力拼搏,迎难而上!";rabbitTemplate.convertAndSend(exchangeName,"jianchi", message);}
}

3.1.2. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue directQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("direct.queue1").build();}@BeanQueue directQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("direct.queue2").build();}@BeanDirectExchange directExchange() {return new DirectExchange("wzx.direct");}@BeanBinding binding1(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("nuli"); // 绑定队列和交换机}@BeanBinding binding2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("jianchi"); // 绑定队列和交换机}
}

 3.1.3. 消费者核心代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "direct.queue1")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "direct.queue2")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

3.1.4. 运行效果 

我们看到消息最终通过路由key,被匹配的消费者2所接收。

四、Topic Exchange

TopicExchange与DirectExchange类似,后者能做的事情,前者也能做。区别在于Topic的routingKey可以是多个单词的列表,并且以 . 分割。 Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

china.news 代表有中国的新闻消息;

china.weather 代表中国的天气消息;

japan.news 则代表日本新闻

japan.weather 代表日本的天气消息

4.1. 案例演示

案例需求

1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

2. 在RabbitMQ控制台中,声明交换机wzx. topic ,将两个队列与其绑定

3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

4. 在publisher中编写测试方法,利用不同的RoutingKey向wzx. topic发送消息

案例代码

我们在上述代码的基础上做调整:

4.1.1. 生产者核心代码

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String exchangeName = "wzx.topic";String message = "人生苦短,持续不断地努力拼搏,迎难而上!";rabbitTemplate.convertAndSend(exchangeName,"wzx.news", message);}
}

 4.1.2. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue topicQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("topic.queue1").build();}@BeanQueue topicQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("topic.queue2").build();}@BeanTopicExchange topicExchange() {return new TopicExchange("wzx.topic");}@BeanBinding binding1(Queue topicQueue1, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#"); // 绑定队列和交换机}@BeanBinding binding2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news"); // 绑定队列和交换机}
}

4.1.3. 消费者核心代码 

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "topic.queue1")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "topic.queue2")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

4.1.4. 运行结果

我们可以看到,消费者2接收到了消息

当我们将发送消息的路由key改为china.news,我们会发现两个消费者都能收到消息,这就是topic的通配符效果:


http://www.ppmy.cn/ops/123838.html

相关文章

Java面试题——第十篇

1. 什么是Java的PLAB PLAB是Java垃圾回收器中的一种优化机制&#xff0c;主要用于G1垃圾收集器&#xff0c;目的是提高对象晋升到老年代的效率。 在垃圾回收过程中&#xff0c;新生代中的某些对象由于生命周期较长&#xff0c;会被晋升到老年代。为了减少线程竞争和提升晋升效…

C# 实现调用函数,打印日志(通过反射代理、非IOC)

&#x1f388;个人主页&#xff1a;靓仔很忙i &#x1f4bb;B 站主页&#xff1a;&#x1f449;B站&#x1f448; &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;C# &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff…

Java设计模式——桥接模式

目录 模式动机 模式定义 模式结构 类图 代码分析 示例&#xff1a;图形绘制系统 抽象部分 扩展抽象部分 实现部分 客户端 模式分析 优点 缺点 适用环境 模式应用 图形绘制系统 数据访问层 用户界面框架 模式扩展 结合其他设计模式 多层次桥接 总结 模式动…

FFMpeg源码分析,关键结构体分析(一)

http://lazybing.github.io/blog/categories/ffmpegyuan-ma-fen-xi/ 一、下载FFmpeg的编译源码 进入网站&#xff1a;http://ffmpeg.org/download.html二、编译源码 执行下述命令&#xff1a; ./configure --prefix/usr/local/ffmpeg --enable-debug3 --enable-ffplay sudo …

如何快速给word文件加拼音?请跟着步骤完成吧

如何快速给word文件加拼音&#xff1f;在日常工作中&#xff0c;我们时常会遇到需要为Word文件中的文字添加拼音的情况&#xff0c;这尤其在教育、出版或国际交流等领域显得尤为重要。为文字配上拼音&#xff0c;不仅能帮助学习者准确发音&#xff0c;还能提升文档的可读性和普…

Github 2024-10-04 Java开源项目日报Top8

根据Github Trendings的统计,今日(2024-10-04统计)共有8个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Java项目8Python项目1Java实现的算法集合:使用Gitpod.io进行编辑和贡献 创建周期:2883 天开发语言:Java协议类型:MIT LicenseStar数量:5726…

论文阅读笔记-XLNet: Generalized Autoregressive Pretraining for Language Understanding

前言 Google发布的XLNet在问答、文本分类、自然语言理解等任务上都大幅超越BERT,XLNet提出一个框架来连接语言建模方法和预训练方法。我们所熟悉的BERT是denoising autoencoding模型,最大的亮点就是能够获取上下文相关的双向特征表示,所以相对于标准语言模型(自回归)的预…

拓扑排序与入度为0的结点算法解析及实现

拓扑排序与入度为0的结点算法解析及实现 算法思想时间复杂度分析伪代码C语言实现环路检测结论拓扑排序是一种用于有向无环图(DAG, Directed Acyclic Graph)的重要操作,它可以对图中的结点进行排序,使得对于每一条有向边 (u, v),顶点 u 在排序中都出现在顶点 v 之前。本文介…