第七章 RabbitMQ之交换机

news/2024/10/11 5:55:24/

目录

一、介绍

二、Fanout Exchange

2.1. 案例演示 

2.1.1. 创建SpringBoot工程

2.1.2. 父工程pom依赖

2.1.3. 生产者pom依赖 

 2.1.4. 生产者配置文件

2.1.5. 生产者核心代码

 2.1.6. 消费者RabbitMQConfig

 2.1.7. 消费者pom依赖

2.1.8. 消费者配置文件

2.1.9.  消费者核心代码

2.1.10. 运行效果

三、Direct Exchange

3.1. 案例演示

3.1.1. 生产者核心代码

3.1.2. 消费者RabbitMQConfig

 3.1.3. 消费者核心代码

3.1.4. 运行效果 

四、Topic Exchange

4.1. 案例演示

4.1.1. 生产者核心代码

 4.1.2. 消费者RabbitMQConfig

4.1.3. 消费者核心代码 

4.1.4. 运行结果


一、介绍

在RabbitMQ中,交换机(Exchange)是用来接收生产者发送的消息并将这些消息路由到一个或多个队列的组件。不同类型的交换机决定了它们如何路由这些消息。

以下是RabbitMQ中几种常用交换机类型:

直接交换机(direct):如果路由键完全匹配,消息就被投递到相应的队列。

广播交换机(fanout):消息被广播到所有绑定的队列。

主题交换机(topic):如果路由键匹配模式(通配符#代表匹配多个单词和*代表匹配一个单词),消息被投递到相应的队列。

头交换机(headers):通过匹配消息头进行路由。

二、Fanout Exchange

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式

2.1. 案例演示 

利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

2. 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定

3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4. 在publisher中编写测试方法,向hmall.fanout发送消息

2.1.1. 创建SpringBoot工程

完整的工程目录结构及代码文件如下:

2.1.2. 父工程pom依赖

引入核心依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mq-demo</name><description>mq-demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!-- spring amqp依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.1.3. 生产者pom依赖 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>publisher</artifactId><version>0.0.1-SNAPSHOT</version><name>publisher</name><description>publisher</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

 2.1.4. 生产者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

2.1.5. 生产者核心代码

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {// 交换机名称String exchangeName = "wzx.fanout";// 消息内容String message = "人生苦短,持续不断地努力拼搏,迎难而上!";rabbitTemplate.convertAndSend(exchangeName,null, message);}
}

 2.1.6. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue fanoutQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("fanout.queue1").build();}@BeanQueue fanoutQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("fanout.queue2").build();}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("wzx.fanout");}@BeanBinding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); // 绑定队列和交换机}@BeanBinding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); // 绑定队列和交换机}
}

 2.1.7. 消费者pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>consumer</name><description>consumer</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.1.8. 消费者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

2.1.9.  消费者核心代码

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "fanout.queue1")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "fanout.queue2")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

2.1.10. 运行效果

我们可以看到,生产者的消息,同时发给了绑定fanout类型交换机的两个队列,两个消费者均收到了同一条消息。

三、Direct Exchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

3.1. 案例演示

我们在2.1示例代码的基础上稍做调整:

3.1.1. 生产者核心代码

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String exchangeName = "wzx.direct";String message = "人生苦短,持续不断地努力拼搏,迎难而上!";rabbitTemplate.convertAndSend(exchangeName,"jianchi", message);}
}

3.1.2. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue directQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("direct.queue1").build();}@BeanQueue directQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("direct.queue2").build();}@BeanDirectExchange directExchange() {return new DirectExchange("wzx.direct");}@BeanBinding binding1(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("nuli"); // 绑定队列和交换机}@BeanBinding binding2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("jianchi"); // 绑定队列和交换机}
}

 3.1.3. 消费者核心代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "direct.queue1")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "direct.queue2")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

3.1.4. 运行效果 

我们看到消息最终通过路由key,被匹配的消费者2所接收。

四、Topic Exchange

TopicExchange与DirectExchange类似,后者能做的事情,前者也能做。区别在于Topic的routingKey可以是多个单词的列表,并且以 . 分割。 Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词

*:代指一个单词

china.news 代表有中国的新闻消息;

china.weather 代表中国的天气消息;

japan.news 则代表日本新闻

japan.weather 代表日本的天气消息

4.1. 案例演示

案例需求

1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

2. 在RabbitMQ控制台中,声明交换机wzx. topic ,将两个队列与其绑定

3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

4. 在publisher中编写测试方法,利用不同的RoutingKey向wzx. topic发送消息

案例代码

我们在上述代码的基础上做调整:

4.1.1. 生产者核心代码

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String exchangeName = "wzx.topic";String message = "人生苦短,持续不断地努力拼搏,迎难而上!";rabbitTemplate.convertAndSend(exchangeName,"wzx.news", message);}
}

 4.1.2. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue topicQueue1() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("topic.queue1").build();}@BeanQueue topicQueue2() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("topic.queue2").build();}@BeanTopicExchange topicExchange() {return new TopicExchange("wzx.topic");}@BeanBinding binding1(Queue topicQueue1, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("china.#"); // 绑定队列和交换机}@BeanBinding binding2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.news"); // 绑定队列和交换机}
}

4.1.3. 消费者核心代码 

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "topic.queue1")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "topic.queue2")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

4.1.4. 运行结果

我们可以看到,消费者2接收到了消息

当我们将发送消息的路由key改为china.news,我们会发现两个消费者都能收到消息,这就是topic的通配符效果:


http://www.ppmy.cn/news/1537294.html

相关文章

分布式 ID

背景 在复杂分布式系统中&#xff0c;往往需要对大量的数据和消息进行唯一标识。随着数据日渐增长&#xff0c;对数据分库分表后也需要有一个唯一ID来标识一条数据或消息&#xff0c;数据库的自增 ID 显然不能满足需求&#xff1b;此时一个能够生成全局唯一 ID 的系统是非常必…

小猿口算脚本

实现原理&#xff1a;安卓adb截图传到电脑&#xff0c;然后用python裁剪获得两张数字图片&#xff0c;使用ddddocr识别数字&#xff0c;比较大小&#xff0c;再用adb命令模拟安卓手势实现>< import os import ddddocr from time import sleep from PIL import Imagedef …

springboot 前后端处理日志

为了实现一个高效且合理的日志记录方案&#xff0c;我们需要在系统架构层面进行细致规划。在某些情况下&#xff0c;一个前端页面可能会调用多个辅助接口来完成整个业务流程&#xff0c;而并非所有这些接口的交互都需要被记录到日志中。为了避免不必要的日志开销&#xff0c;并…

谷歌AI大模型Gemini API快速入门及LangChain调用视频教程

1. 谷歌Gemini API KEY获取及AI Studio使用 要使用谷歌Gemini API&#xff0c;首先需要获取API密钥。以下是获取API密钥的步骤&#xff1a; 访问Google AI Studio&#xff1a; 打开浏览器&#xff0c;访问Google AI Studio。使用Google账号登录&#xff0c;若没有账号&#xf…

充电宝租赁管理系统网站毕业设计SpringBootSSM框架开发

目录 1. 概述 2. 技术选择与介绍 3. 系统设计 4. 功能实现 5. 需求分析 1. 概述 充电宝租赁管理系统网站是一个既实用又具有挑战性的项目。 随着移动设备的普及和人们日常生活对电力的持续依赖&#xff0c;充电宝租赁服务已成为现代都市生活中的一项重要便利设施。它不仅为…

机器学习与神经网络:开启物理学的新篇章

近日&#xff0c;2024年诺贝尔物理学奖的颁发引发了全球热议&#xff0c;尤其是首次将这项传统上授予物理学研究者的奖项颁给了机器学习与神经网络领域的科学家。这一举动标志着人工智能技术&#xff0c;尤其是深度学习技术&#xff0c;正在深入影响科学的各个领域&#xff0c;…

GC1277和灿瑞的OCH477优势分析 可以用于电脑散热风扇,视频监控和图像处理的图像信号处理器中

GC1277和灿瑞的OCH477是两款用于视频监控和图像处理的图像信号处理器&#xff08;ISP&#xff09;。在对比这两款产品时&#xff0c;可以从以下几个方面考虑它们的优势和特点&#xff1a; 1. 图像处理能力 GC1277&#xff1a;通常具有更强的图像处理算法&#xff0c;支持多种…

npm依赖版本锁定详解

npm中有一个package-lock.json的文件&#xff0c;即npm依赖锁文件&#xff0c;用来描述npm依赖生成的确切树&#xff0c;这样不管你的依赖有何种更新&#xff0c;都会按照这个确切树来安装使用。 不同的包管理工具对应不同的锁文件&#xff1a; ● npm > package-lock.json…