RabbitMQ中的Work Queues模式

news/2024/12/18 0:40:42/

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种消息传递模式。其中,Work Queues(工作队列)模式是一种常见的模式,用于在多个消费者之间分配任务,从而实现负载均衡和提高系统的处理能力。下面将详细介绍 RabbitMQ 中的 Work Queues 模式。

1. 什么是 Work Queues 模式?

Work Queues 模式(也称为任务队列模式)是一种消息传递模式,用于在多个消费者之间分配任务。在这种模式下,生产者将任务(消息)发送到队列中,多个消费者从队列中获取任务并进行处理。每个任务只会被一个消费者处理,从而实现负载均衡。

在这里插入图片描述

Work Queues 模式的主要优点包括:

  1. 负载均衡:多个消费者可以并行处理任务,从而提高系统的处理能力。
  2. 解耦:生产者和消费者之间通过队列进行通信,彼此之间不需要直接交互。
  3. 异步处理:任务可以异步处理,生产者不需要等待任务完成即可继续执行其他操作。

2. Work Queues 模式的工作原理

2.1 生产者(Producer)

生产者负责将任务(消息)发送到队列中。生产者不需要知道有多少消费者会处理这些任务,只需要将任务发送到队列即可。

2.2 队列(Queue)

队列是消息的缓冲区,用于存储生产者发送的任务。队列可以有多个消费者,但每个任务只会被一个消费者处理。

2.3 消费者(Consumer)

消费者从队列中获取任务并进行处理。多个消费者可以并行处理任务,从而实现负载均衡。消费者可以是独立的进程、线程或服务。

2.4 消息确认(Message Acknowledgment)

为了确保任务能够可靠地处理,RabbitMQ 提供了消息确认机制。消费者在处理完任务后,需要向 RabbitMQ 发送确认消息,告知任务已经处理完成。如果消费者在处理任务时崩溃,RabbitMQ 会将未确认的任务重新分配给其他消费者。

2.5 公平分发(Fair Dispatch)

默认情况下,RabbitMQ 会按顺序将任务分发给消费者。然而,如果某些消费者处理任务的速度较慢,可能会导致任务堆积。为了避免这种情况,可以使用 basicQos 方法设置预取计数(prefetch count),限制每个消费者一次可以获取的任务数量,从而实现更公平的分发。

3. 环境准备

在开始之前,确保你已经安装了以下环境:

  • Java 开发环境(JDK 8 或更高版本)
  • RabbitMQ 服务器(已启动并运行)
  • Maven(用于管理依赖)

3.1 添加依赖

首先,在你的项目中添加 RabbitMQ 客户端库的依赖。如果你使用的是 Maven,可以在 pom.xml 中添加以下依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

4. 代码案例

4.1 生产者代码

生产者负责将任务发送到队列。以下是一个简单的生产者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.将消息发送到队列for (int i = 1; i <= 10; i++) {String message = "Task to be processed, " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}}
}

代码解释:

  • ConnectionFactory: 用于创建与 RabbitMQ 服务器的连接。
  • Connection: 表示与 RabbitMQ 服务器的物理连接。
  • Channel: 表示与 RabbitMQ 服务器的逻辑连接,用于发送和接收消息。
  • queueDeclare: 声明一个队列,true 表示队列是持久的。
  • basicPublish: 将消息发布到队列,使用默认交换机(空字符串)。

4.2 消费者代码

消费者负责从队列中获取任务并处理。以下是一个简单的消费者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1); // 每次只处理一个消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");//手动ack,因为不同的机器处理速度不一样,因此不同的机器会在不同时间应答,这样机器就可以根据实际能力处理了channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

代码解释:

  • basicQos(1): 设置每次只处理一个消息,确保任务的公平分配。
  • DeliverCallback: 定义消息处理逻辑,doWork 方法模拟任务处理过程。
  • basicAck: 确认消息处理完成,从队列中移除消息。

5. 运行示例

  1. 启动 RabbitMQ 服务器:确保 RabbitMQ 服务器已启动并运行。
  2. 运行多个消费者:启动多个消费者实例,确保它们连接到同一个队列。
  3. 运行生产者:启动生产者实例,发送任务到队列。

示例输出:

  • 生产者输出

     [x] Sent 'Task to be processed, 1'[x] Sent 'Task to be processed, 2'[x] Sent 'Task to be processed, 3'[x] Sent 'Task to be processed, 4'[x] Sent 'Task to be processed, 5'[x] Sent 'Task to be processed, 6'[x] Sent 'Task to be processed, 7'[x] Sent 'Task to be processed, 8'[x] Sent 'Task to be processed, 9'[x] Sent 'Task to be processed, 10'
    
  • 消费者1输出

     [*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 1'[x] Done[x] Received 'Task to be processed, 4'[x] Done[x] Received 'Task to be processed, 6'[x] Done[x] Received 'Task to be processed, 8'[x] Done[x] Received 'Task to be processed, 10'[x] Done
    
  • 消费者2输出

     [*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 2'[x] Done[x] Received 'Task to be processed, 3'[x] Done[x] Received 'Task to be processed, 5'[x] Done[x] Received 'Task to be processed, 7'[x] Done[x] Received 'Task to be processed, 9'[x] Done
    

在这里插入图片描述

6. 总结

本文详细介绍了如何在 RabbitMQ 中实现 Work Queues 模式,包括生产者、默认交换机、队列和多个消费者的设计与实现。通过使用 RabbitMQ 的 Java 客户端库,我们可以轻松地实现任务的分配和处理。Work Queues 模式非常适合需要将任务分配给多个消费者处理的场景,如任务调度、日志处理等。


http://www.ppmy.cn/news/1555993.html

相关文章

nginx问题总结

问题记录&#xff1a; 默认网络下部署nginx&#xff0c;挂载nginx.conf文件&#xff0c;提示&#xff1a;nginx: [emerg] host not found in upstream "shop" in /etc/nginx/nginx.conf:29。nginx配置内容&#xff1a; location /api {rewrite /api/(.*) /$1 break;…

Web身份认证 --- Session和JWT Token

Web 身份认证 --- Session和JWT Token 方法一: 通过使用Session进行身份认证方法二: 通过JWT token进行身份认证什么是JWTJWT完整流程JWT攻防JWT 如何退出登录JWT的续签 方法一: 通过使用Session进行身份认证 用户第一次请求服务器的时候&#xff0c;服务器根据用户提交的相关信…

自动化高架仓库中托盘状态精准监控的解决方案

在自动化高架仓库的高效运作背后&#xff0c;隐藏着一些亟待解决的技术难题。其中&#xff0c;货架的稳定性及托盘的精确定位问题&#xff0c;对整个仓库的作业效率和安全性有着至关重要的影响。 自动化高架仓库中的货架大多由钢结构或钢框架构成&#xff0c;初看之下&#xf…

Android之RecyclerView显示数据列表和网格

一、RecyclerView的优势 RecyclerView 的最大优势在于&#xff0c;它对大型列表来说非常高效&#xff1a; 默认情况下&#xff0c;RecyclerView 仅会处理或绘制当前显示在屏幕上的项。例如&#xff0c;如果您的列表包含一千个元素&#xff0c;但只有 10 个元素可见&#xff0…

JavaScript学习难点

一、语法的灵活性 动态类型&#xff1a; JavaScript 是一种动态类型语言&#xff0c;这意味着变量的类型可以在运行时改变。这与静态类型语言&#xff08;如 Java、C&#xff09;形成鲜明对比&#xff0c;在静态类型语言中&#xff0c;变量的类型在编译时就已经确定。 例如&am…

ChatGPT推出视频通话及屏幕理解功能,近屿智能邀您共探AI前沿技术

北京时间12月13日凌晨&#xff0c;OpenAI在第六天直播活动中宣布为 ChatGPT 的高级语音模式带来视频输入和屏幕理解功能&#xff0c;同时&#xff0c;为了迎接即将到来的圣诞节&#xff0c;OpenAI还限时推出了充满节日氛围的圣诞老人模式。 直播一开场&#xff0c;几位团队成员…

005 Qt常用控件Qwidget_下

文章目录 前言windowOpacity属性cursor属性font属性toolTip属性focuspolicy属性styleSheet属性 小结 前言 本文将会向你分享Qwidget的常见属性 windowOpacity属性 API说明windowOpacity()获取到控件的不透明数值. 返回 float, 取值为 0.0 -> 1.0 其中 0.0 表⽰全透明, 1.…

《深入理解 Java 中的 ImmutableList》

一、引言 在 Java 编程中&#xff0c;数据结构的选择对于程序的性能、可读性和可维护性至关重要。Java 中的ImmutableList是一种不可变的列表类型&#xff0c;它在很多场景下都有着独特的优势。本文将深入探讨 Java 中的ImmutableList&#xff0c;包括其概念、特点、用法以及与…