RabbitMQ教程:工作队列(Work Queues)(二)

embedded/2024/11/17 11:14:14/

RabbitMQ教程:工作队列(Work Queues)(二)

一、引言

在快节奏的软件开发世界中,我们经常面临需要异步处理任务的场景,比如在Web应用中处理耗时的图片处理或数据分析任务。这些任务如果直接在用户的HTTP请求中同步处理,会导致用户体验不佳,因为用户需要等待任务完成才能继续。这时,工作队列(Work Queues)就显得尤为重要。工作队列允许我们将任务排队,然后在后台异步处理,这样可以释放Web服务器来处理更多的用户请求,提高应用的响应速度和吞吐量。

在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现工作队列。我们将创建一个模拟的任务分发系统,它能够在多个工作进程之间分配任务,确保任务的均衡执行和持久存储,即使在RabbitMQ服务器重启的情况下也不会丢失任务。
在这里插入图片描述

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ发送和接收消息。今天,我们将探索工作队列(Work Queues),这是一种在多个工作进程(workers)之间分配耗时任务的机制。工作队列也被称为任务队列(Task Queues),它的核心思想是避免立即执行资源密集型任务,而是将任务安排到以后执行。通过这种方式,我们可以将任务封装成消息并发送到队列中,然后由后台运行的工作进程来处理这些任务。

三、准备工作

3.1 说明

在之前的教程中,我们发送了包含“Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。由于我们没有实际的任务(比如需要调整大小的图片或需要渲染的PDF文件),我们将使用Task.Delay()函数来模拟工作负载。

3.2 生成项目

首先,我们需要生成两个项目(也可直接vs创建):

dotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
cd ../Worker
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送任务(NewTask),另一个用于接收并处理任务(Worker)。

四、实战

4.1 修改发送程序

我们将更新NewTask程序,以便从命令行发送任意消息。这个程序将任务安排到我们的工作队列中,因此我们将其命名为NewTask

using RabbitMQ.Client;
using System.Text;await PublishMessagesAsync(20);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{// 循环发送指定次数的消息for (int i = 1; i <= loopCount; i++){// 调用SendMessageToQueue方法发送消息,并包含当前迭代次数await SendMessageToQueue($"Iteration {i} - Hello World");// 这里可以添加延迟,如果需要的话// await Task.Delay(1000);}Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{// 创建连接工厂,并设置RabbitMQ服务器地址为localhostvar factory = new ConnectionFactory { HostName = "localhost" };// 使用异步方式创建连接using var connection = await factory.CreateConnectionAsync();// 使用异步方式创建通道using var channel = await connection.CreateChannelAsync();// 异步声明名为"task_queue"的持久队列await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,autoDelete: false, arguments: null);// 将消息内容编码为字节数组var body = Encoding.UTF8.GetBytes(message);// 创建消息属性,并设置为持久化var properties = new BasicProperties{Persistent = true};// 异步发布消息到队列await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,basicProperties: properties, body: body);// 打印消息发送确认信息Console.WriteLine($" [x] Sent {message}");
}

4.2 修改接收程序

我们的旧Receive.cs脚本也需要修改,以便模拟消息体中每个点号一秒的工作量。以下是修改后的代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhost
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();
// 异步声明名为"task_queue"的持久队列
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,autoDelete: false, arguments: null);// 设置QoS参数,确保每次只有一个消息被分发给同一个消费者
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);Console.WriteLine(" [*] Waiting for messages.");// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);// 设置接收到消息时的事件处理程序
consumer.ReceivedAsync += async (model, ea) =>
{// 获取消息体并转换为字符串byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");// 模拟工作负载,延时1.5秒await Task.Delay(1500);// 手动确认消息,确保消息被正确处理await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"task_queue"队列中的消息
await channel.BasicConsumeAsync("task_queue", // 队列名称autoAck: false, // 是否自动确认消息,默认为false,需要手动确认consumer: consumer); // 指定消费者对象,用于接收消息
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

在这里插入图片描述

4.3 消息确认(Message Acknowledgment)

在处理任务时,可能会出现消费者处理任务时崩溃的情况。如果消费者在处理任务时崩溃,RabbitMQ会立即将消息标记为已删除,这样我们就会丢失正在处理的消息以及所有未处理的消息。

为了确保消息不会丢失,RabbitMQ支持消息确认(acknowledgment)。消费者会向RabbitMQ发送确认,告知它特定的消息已经被接收和处理,RabbitMQ可以安全地删除该消息。

如果消费者在未发送确认的情况下崩溃(例如,通道关闭、连接关闭或TCP连接丢失),RabbitMQ会将该消息重新排队。如果有其他消费者在线,RabbitMQ会迅速将该消息重新分发给其他消费者。这样,我们可以确保即使工作进程偶尔崩溃,也不会丢失任何消息。

默认情况下,消费者的确认超时时间为30分钟。这有助于检测未确认的消费者。如果需要,可以根据需要增加此超时时间。

在我们的代码中,我们已经将autoAck参数设置为false,并在处理完任务后手动发送确认。以下是确认消息的代码:

await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);

4.4 公平分发(Fair Dispatch)

你可能已经注意到,分发仍然不是我们想要的方式。例如,在有两个工作进程的情况下,如果所有奇数消息都很重,而偶数消息都很轻,一个工作进程将始终忙碌,而另一个工作进程几乎不工作。这是因为RabbitMQ在消息进入队列时就分发消息,它不会查看消费者未确认的消息数量。它只是简单地将每第n个消息分发给第n个消费者。

为了改变这种行为,我们可以使用BasicQos方法,并设置prefetchCount为1。这告诉RabbitMQ一次不要给工作进程超过一个消息。换句话说,直到工作进程处理并确认前一个消息之前,不要分发新消息给它。相反,它将分发给下一个不忙的工作进程。在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ工作队列的概念和实现。通过构建一个模拟的任务分发系统,我们学习了如何在多个工作进程之间分配任务,以及如何确保任务的均衡执行和持久存储。以下是我们从本教程中获得的关键要点:

  1. 异步任务处理:通过使用工作队列,可以将耗时的任务异步处理,从而提高Web应用的响应速度和用户体验。

  2. 任务封装:将复杂的任务封装成消息,发送到队列中,由后台的工作进程来处理这些任务。

  3. 消息确认:实现了消息确认机制,确保了即使在消费者处理任务时崩溃,消息也不会丢失,并且可以被重新分发给其他消费者处理。

  4. 公平分发:通过设置prefetchCount为1,实现了公平分发,确保了工作负载在所有消费者之间均匀分配,避免了某些消费者过载而其他消费者空闲的情况。

  5. 持久化:将队列和消息设置为持久化,以确保即使RabbitMQ服务器重启,任务也不会丢失。

通过这些机制,我们能够建立一个健壮的工作队列系统,它不仅能够提高应用的性能,还能够在面对各种异常情况时保持任务的可靠性和持久性。这些知识为我们在实际开发中实现复杂的异步任务处理提供了坚实的基础。


http://www.ppmy.cn/embedded/138240.html

相关文章

Jetson ros默认依赖opencv与系统安装opencv、推流、cv_bridge等存在BUG

下载源码&#xff0c;源码编译支持系统opencv git clone -b noetic https://github.com/ros-perception/vision_opencv.gitset(_opencv_version 4) #设置为系统opencv find_package(OpenCV 4 QUIET) if(NOT OpenCV_FOUND)message(STATUS "Did not find OpenCV 4, tryin…

基于的图的异常检测算法OddBall

OddBall异常检测算法出自2010年的论文《OddBall: Spotting Anomalies in Weighted Graphs》&#xff0c;它是一个在加权图(weighted graph)上检测异常点的算法&#xff0c;基本思路为计算每一个点的一度邻域特征&#xff0c;然后在整个图上用这些特征拟合出一个函数&#xff0c…

Django中的URL配置与动态参数传递(多种方法比较)

Django中的URL配置与动态参数传递(多种方法比较) 目录 ✨ 基础URL配置与re_path()的解读&#x1f527; path()与re_path()的对比分析&#x1f680; 动态参数处理方案详解&#x1f4d8; 正则表达式匹配的优势与劣势&#x1f9e9; 利用path()进行路径参数处理的实现与优劣&…

蓝桥杯——杨辉三角

代码 package day3;public class Demo2 {public static void main(String[] args) {// TODO Auto-generated method stub// for (int i 0; i < 10; i) {// for (int j 0; j < 10; j) {// System.out.print("外&#xff1a;"i"内&#xff1a;&qu…

网页web无插件播放器EasyPlayer.js H.265流媒体播放器的decoder.js报Unexpected token ‘<‘错误

EasyPlayer.js H.265流媒体播放器属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;支持H.264与H.265编码格式&#xff0c;性能稳定、播放流畅&#xff1b;支持WebSocket-FLV、HTTP-FLV&#xff0c;HLS&#xff08;m3u8&#xff…

JavaScript 变量:理解基元和引用类型

两种基本类型的数据存储在 javascript 中的变量中&#xff1a;基元 和 引用类型。了解这两种类型之间的区别对于内存管理以及调节数据的共享、存储和更改至关重要。本文深入探讨了它们之间的区别&#xff0c;提供了现实世界的示例&#xff0c;并研究了有效处理这两种类型的方法…

【深度学习基础 | 预备知识】数据预处理

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上&#xff0c;结合当代大数据和大算力的发展而发展出来的。深度学习最重…

C++ 的发展

目录 C 的发展总结&#xff1a;​编辑 1. C 的早期发展&#xff08;1979-1985&#xff09; 2. C 标准化过程&#xff08;1985-1998&#xff09; 3. C 标准演化&#xff08;2003-2011&#xff09; 4. C11&#xff08;2011年&#xff09; 5. C14&#xff08;2014年&#xf…