第六章 RabbitMQ之Work模式

embedded/2024/10/15 19:41:39/

目录

一、介绍

二、Work模式 

三、案例演示 

3.1. 案例需求

3.2. 案例代码实现

3.2.1. 创建SpringBoot工程

3.2.2. 父工程pom依赖

3.2.3. 生产者pom依赖 

 3.2.3. 生产者配置文件

3.2.4. 生产者核心代码

3.2.5. 消费者RabbitMQConfig

 3.2.6. 消费者pom依赖

3.2.7. 消费者配置文件

3.2.8.  消费者核心代码

3.2.9. 运行效果

3.3. 消费者消息推送机制

3.3.1. 调整代码

 3.3.2. 运行效果

四、总结


一、介绍

在上一章的讲解中,我们通过一个simple模式的简单示例代码给大家做了快速入门演示,本章节我们就来讲讲RabbitMQ的Work模式 。讲解之前,先简单介绍下RabbitMQ支持六种主要的工作模式‌,分别是简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式和远程调用模式(RPC)。其中,远程调用模式不常用,因此主要介绍前五种模式‌。

简单模式

简单模式是最基本的消息传递模式,涉及一个生产者和一个消费者。生产者将消息发送到队列,消费者从队列中接收并处理消息。这种模式适用于简单的点对点通信场景,例如手机短信或邮件单发‌。

工作队列模式

工作队列模式允许多个消费者从同一个队列中接收消息,通过在消费者之间分配任务来提高消息处理的效率。RabbitMQ默认采用轮询的方式分发消息,确保每个消费者接收到的消息数量大致相同‌。

发布订阅模式

发布订阅模式是一种特殊的消息传递模式,生产者发送的消息会被发送到所有订阅了该主题的消费者。这种模式适用于需要广播消息的场景,例如邮件群发或群聊天‌。

路由模式

路由模式使用路由交换机,根据消息属性将消息发送到特定的队列。这种模式适用于需要根据消息内容精确匹配队列的场景,例如短信或聊天工具‌。

通配符模式

通配符模式使用通配符来匹配路由键,允许更灵活的消息路由。这种模式适用于需要根据多级路径匹配队列的场景,例如中国.四川.成都.武侯区‌。

二、Work模式 

​​​​​​​​​​​​​​RabbitMQ的Work模式是一种简单的消息队列模式,也称为“竞争消费者模式”或“任务分发模式”。‌ 在这种模式下,多个消费者同时监听同一个队列,当队列中有消息时,只有一个消费者能够获得这个消息并进行处理,其他消费者需要等待下一个消息的到来。这种模式广泛应用于分布式系统中的任务调度或并行处理场景中‌。

在Work模式下,生产者将消息发送到队列中,多个消费者监听这个队列。当队列中有消息时,RabbitMQ会将消息分发给其中一个消费者。每个消费者独立处理分配到的任务,处理完成后确认消息,然后等待下一个任务。这种模式确保了消息在执行过程中可以分布到多个消费者中,并且每个消费者可以执行自己的任务‌。

Work模式的特点:

  • 多个消费者监听同一个队列‌:当队列中有消息时,只有一个消费者能获得并处理这个消息。
  • 消息按顺序分配‌:消费者按顺序接收消息,处理完一条后再接收下一条。
  • 公平分发‌:可以根据消费者的处理能力进行公平分发,处理快的消费者会处理更多的消息‌。

在实际应用中,Work模式常用于需要并行处理大量任务的场景,例如批量数据处理、后台任务处理等。通过合理配置消费者数量和处理能力,可以有效地利用资源并提高系统整体的处理效率‌。

三、案例演示 

3.1. 案例需求

模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

1. 在RabbitMQ的控制台创建一个队列名为work.queue

2. 在publisher服务中定义测试方法,发送50条消息到work.queue

3. 在consumer服务中定义两个消息监听者,都监听work.queue队列

5. 消费者1每秒处理40条消息,消费者2每秒处理5条消息

3.2. 案例代码实现

3.2.1. 创建SpringBoot工程

完整的工程目录结构及代码文件如下:

3.2.2. 父工程pom依赖

引入核心依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>mq-demo</name><description>mq-demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!-- spring amqp依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.2.3. 生产者pom依赖 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>publisher</artifactId><version>0.0.1-SNAPSHOT</version><name>publisher</name><description>publisher</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

 3.2.3. 生产者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

3.2.4. 生产者核心代码

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

package com.example.publisher;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage() throws InterruptedException {String queueName = "work.queue";String message = "人生苦短,持续不断地努力拼搏,迎难而上!";for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(20);}}
}

3.2.5. 消费者RabbitMQConfig

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@BeanQueue myQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("work.queue").build();}
}

 3.2.6. 消费者pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>consumer</name><description>consumer</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.2.7. 消费者配置文件

spring:rabbitmq:# 主机host: 127.0.0.1# 端口port: 5672# 默认用户密码是guest guest 如果需要自己创建新用户,参看我的第四章节内容username: Wangzhexiaopassword: Wangzhexiao# 默认的虚拟主机是/ 如果需要自己创建虚拟主机,参看我的第四章节内容virtual-host: /hangzhou

3.2.8.  消费者核心代码

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "work.queue")public void listener1(String message) {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");}@RabbitListener(queues = "work.queue")public void listener2(String message) {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");}
}

3.2.9. 运行效果

 

我们可以看到,生产者的消息,平均的分配给了两个消费者,类似于轮询机制。 

3.3. 消费者消息推送机制

3.3.1. 调整代码

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。 因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1

我们对消费者的代码也做个调整,通过增加线程等待时间来模拟两个消费者的消费能力:

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "work.queue")public void listener1(String message) throws InterruptedException {System.out.println("消费者1:人生是个不断攀登的过程【" + message + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listener2(String message) throws InterruptedException {System.err.println("消费者2:人生是个不断攀登的过程【" + message + "】");Thread.sleep(200);}
}

 3.3.2. 运行效果

我们可以看到,消费者1因为消费速度比较快,所以能者多劳。当我们生产者的消费者特别多的情况下, 我们就需要多个消费者同时监听消费一个队列。实际项目中我们会通过集群模式,多台服务监听同一队列,来达到这种效果,解决消息堆积的问题。

四、总结

Work模型的使用:

多个消费者绑定到一个队列,可以加快消息处理速度

同一条消息只会被一个消费者处理

通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳


http://www.ppmy.cn/embedded/128033.html

相关文章

【bug】finalshell向远程主机拖动windows快捷方式导致卡死

finalshell向远程主机拖动windows快捷方式导致卡死 问题描述 如题&#xff0c;作死把桌面的快捷方式拖到了finalshell连接的服务器面板中&#xff0c;导致finalshell没有响应&#xff08;小概率事件&#xff0c;有时会触发&#xff09; 解决 打开任务管理器查看finalshell进…

AD9680(adc直采芯片)使用说明

写这篇文章之前我是没有使用过AD9680的芯片&#xff0c;但是使用过GMS011芯片&#xff08;是国内24S&#xff09;下的公司出来的芯片&#xff0c;寄存器和管脚全对标。 在这里我就大概说一下芯片的说用方法 一、硬件设计 该芯片支持双通道射频直采 支持协议JESD204B 14位 采样…

SQL 中创建、更改和删除表的基本知识

在 SQL 中&#xff0c;表是存储数据的基本结构。掌握创建、更改和删除表的操作是数据库管理和数据处理的基础。以下将详细介绍这些操作的基本知识。 一、创建表&#xff08;CREATE TABLE&#xff09; 创建表使用 CREATE TABLE 语句。以下是一个基本的示例&#xff1a; CREAT…

设计模式 - 行为模式

行为模式 观察者模式&#xff0c;策略模式&#xff0c;命令模式&#xff0c;中介者模式&#xff0c;备忘录模式&#xff0c;模板方法模式&#xff0c;迭代器模式&#xff0c;状态模式&#xff0c;责任链模式&#xff0c;解释器模式&#xff0c;访问者模式 保存/封装 行为/请求…

Java的类加载机制

虚拟机把描述类的数据从 Class 文件加载到内存&#xff0c;并对数据进行校验、转换解析和初始化&#xff0c;最终形成可以被虚拟机直接使用的 Java 类型&#xff0c;这就是虚拟机的类加载机制。 在Java语言里面&#xff0c;类型的加载、连接和初始化过程都是在程序运行期间完成…

机载交互详解!

一、机载交互网络 机载交互网络是指飞机内部用于传输飞行员指令、飞行数据以及系统状态信息的通信网络。它通常由多个节点&#xff08;如传感器、控制器、显示器等&#xff09;和连接这些节点的通信链路组成。 节点&#xff1a; 传感器节点&#xff1a;负责采集飞机的各种飞…

word如何转换成pdf?7款word转pdf工具值得一试,图文教程详解!

毫无疑问&#xff0c;word格式是内容编辑中最受欢迎的选择之一&#xff0c;它能够显著提升文档的互动性。然而&#xff0c;由于与不同软件和操作系统之间的兼容性差异&#xff0c;当您在电脑上打开word文档时&#xff0c;可能会发现现内容扭曲或丢失的情况。相比之下&#xff0…

《超详细Redisson实战用法· 看这一篇就可以了》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…