RabbitMQ工作模式(详解 工作模式:简单队列、工作队列、公平分发以及消息应答和消息持久化)

embedded/2025/2/12 3:27:41/

文章目录

  • 十.RabbitMQ
    • 10.1 简单队列实现
    • 10.2 Work 模式(工作队列)
    • 10.3 公平分发
    • 10.4 RabbitMQ 消息应答与消息持久化
      • 消息应答
        • 概念
        • 配置
      • 消息持久化
        • 概念
        • 配置

十.RabbitMQ

10.1 简单队列实现

简单队列通常指的是一个基本的消息队列,它可以用于在生产者(生产消息的一方)和消费者(消费消息的一方)之间传递消息。

在这里插入图片描述

新创建Springboot项目

引入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.1</version></dependency>

连接工具类

java">public class ConnectionUtils
{public static Connection getConnection(){try{Connection connection = null;//定义一个连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务端地址(域名地址/ip)factory.setHost("127.0.0.1");//设置服务器端口号factory.setPort(5672);//设置虚拟主机(相当于数据库中的库)factory.setVirtualHost("/");//设置用户名factory.setUsername("guest");//设置密码factory.setPassword("guest");connection = factory.newConnection();return connection;}catch (Exception e){return null;}}
}

创建生产者

java">public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 定义发送信息String msg = "hello rabbitmq-kwh";// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

创建消费者

java">public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,Envelope envelope,                        AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);}};// 监听队列channel.basicConsume("test4072",true,consumer);}catch (Exception e) {e.printStackTrace();}}
}

10.2 Work 模式(工作队列)

工作队列的概念

  • 工作队列模式:生产者将任务发送到队列中,多个消费者从队列中取出任务并并行处理。这意味着,多个消费者可以共同工作来处理同一个队列中的任务。
  • 负载均衡:每个消费者只处理一个任务(消息),通过增加消费者数量,任务的处理可以并行化,提高整体处理能力。

工作队列的特点:

  1. 任务分配:RabbitMQ 将队列中的任务(消息)分配给可用的消费者,通常是按照“轮询”或“平衡”方式分配,即消费者可以公平地处理任务。
  2. 任务处理并行化:多个消费者可以并行地从同一个队列中消费消息,从而实现任务的并行处理,减轻单一消费者的负担。
  3. 消息丢失的风险低:通过合理配置队列和消息持久化机制,即使 RabbitMQ 重启,也能确保任务消息不丢失。

在这里插入图片描述

生产者

(只是在简单队列中的生产者中循环发送了信息。)

java">/*** Work 模式(工作队列)*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

消费者01

java">public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                     Envelope envelope,                        AMQP.BasicProperties properties,           byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);}};// 监听队列channel.basicConsume("test4072",true,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

java">public class Consumer02 {public static void main(String[] args) {try {System.out.println("======消费者02======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                    Envelope envelope,                       AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println("msg==接收=="+str);}};// 监听队列channel.basicConsume("test4072",true,consumer);}catch (Exception e) {e.printStackTrace();}}
}

在这里插入图片描述

. 消费者 1 与消费者 2 处理的数据条数一样。

. 消费者 1 偶数 ;消费者 2 奇数

这种方式叫轮询分发(Round-robin)。

10.3 公平分发

指消息被均匀地分配给多个消费者,以便各个消费者的负载大致相等。通过这种方式,RabbitMQ 旨在避免某些消费者过载而其他消费者空闲的情况。

在这里插入图片描述

在10.2 中,现在有 2 个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而 RabbitMQ 则是不了解这些的。这是因为当消息进入队列,RabbitMQ 就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

改造生产者

java">/*
同一时刻服务器只会发一条消息给消费者
1 限制发送给消费者不得超过一条消息
*/
channel.basicQos(1);
java">/*** 公平分发*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}}

消费者01

(在10.2 中消费者的基础上,只添加 channel.basicQos(1);,限制每次只消费一个消息)

java">public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//限制每次只消费一个消息channel.basicQos(1);// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                      Envelope envelope,                     AMQP.BasicProperties properties,byte[] body) throws IOException {String str = new String(body,"utf-8");System.out.println(envelope.getDeliveryTag()+"msg==接收=="+str);// 休眠一秒钟try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}finally {// 手动确认消息// 第一个参数:消息的序号,// 第二个参数:是否批量,false 单条消息应答 当为 true 时批量应答channel.basicAck(envelope.getDeliveryTag(),false);}}};// 监听队列// 自动应答设为 falsechannel.basicConsume("test4072",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02

(同消费者01)

在这里插入图片描述

消费者 1 休眠 1 秒,消费者 2 休眠 2 秒。

分别设置接收消息数,手动反馈,关闭自动应答

10.4 RabbitMQ 消息应答与消息持久化

消息应答

概念

**消息应答(ack)**是 RabbitMQ 中一个重要的机制,用于保证消息在被消费者处理后得以正确确认,确保消息不会丢失。如果消费者成功处理了消息,应该发送一个确认应答;如果消费者遇到问题或失败,则可以选择拒绝该消息,甚至重新放回队列供其他消费者处理。

应答类型:

  • **自动应答(auto-ack):**自动应答是默认设置,消费者从队列中获取消息后,RabbitMQ 会立即认为该消息已经被成功处理,即使消费者并未真正处理完成。在这种模式下,消息会在被消费后立即从队列中删除,而无需消费者确认。这种模式的缺点是,如果消费者在处理消息时崩溃,消息会丢失。
  • **手动应答(manual ack):**消费者处理完消息后,需要显式地发送确认应答,通知 RabbitMQ 该消息已经处理完成。这样,如果消费者没有发送确认应答,RabbitMQ 会重新将消息发送给其他消费者。
配置
java">// 监听队列
// 参数2:自动应答设为 false; true:开启自动应答
channel.basicConsume("test4072",false,consumer);

参数2为true时:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。一旦rabbitmq 将消息分发给消费者,就会从内存中删除。(会丢失数据消息)

参数2为false时:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。如果有一个消费者挂掉,就会交付给其他消费者。手动告诉 rabbitmq 消息处理完成后,rabbitmq 删除内存中的消息。

反馈:

java">//手动回馈
channel.basicAck(envelope.getDeliveryTag(),false);

使用 Nack 让消息回到队列中

java">// 处理条数; 是否批量处理 ;是否放回队列 false 丢弃
channel.basicNack(envelope.getDeliveryTag(),false,true);

生产者

java">/*** 消息应答*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 创建队列// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据channel.basicPublish("", "test4072", null, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}}

消费者01

java">public class Consumer01 {public static void main(String[] args) {try {System.out.println("======消费者01======");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//限制每次只消费一个消息,防止通道中消息阻塞channel.basicQos(1);// 创建队列(有就直接连接。没有则创建)// 队列名称,是否持久化,是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", false, false, false, null);// 消费者消费消息DefaultConsumer consumer = new DefaultConsumer(channel){@Override //一旦有消息进入 将触发public void handleDelivery(String consumerTag,                       Envelope envelope,                      AMQP.BasicProperties properties,byte[] body) throws IOException {String str = "";try {str = new String(body,"utf-8");if(envelope.getDeliveryTag()==3){int i=1/0;}System.out.println(envelope.getDeliveryTag()+"消费者01msg==接收=="+str);//手动应答 处理完了// 手动确认消息,即手动反馈// 第一个参数:消息的序号,// 第二个参数:是否批量,false 单条消息应答 ;当为 true 时批量应答channel.basicAck(envelope.getDeliveryTag(),false);}catch(Exception e){// e.printStackTrace();System.out.println("消费者01处理第"+envelope.getDeliveryTag()+"条,时报错,消息内容为"+str);//手动应答 报错了// 第一个参数:消息的序号,// 第二个参数:是否批量,false 单条消息应答 当为 true 时批量应答// 第三个参数:是否放回队列 ;false丢弃 ,true 放回队列channel.basicNack(envelope.getDeliveryTag(),false,true);}// 休眠一秒钟try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}};// 监听队列// 参数2:自动应答设为 false; true:开启自动应答channel.basicConsume("test4072",false,consumer);}catch (Exception e) {e.printStackTrace();}}
}

消费者02(同消费者01)

在这里插入图片描述

消息持久化

概念

RabbitMQ 的持久化机制是确保消息和队列在系统崩溃、重启或其他故障情况下不会丢失的关键功能。确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。

配置

持久化队列

java">// 创建队列,
// 队列名称,是否持久化(队列),是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", true, false, false, null);

消息持久化

java">// 发送数据
// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息
//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
channel.basicPublish("", "test4072", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

如果改动队列参数配置,需要删除原有的队列,重新建,因为在 rabbitmq 是不允许重新定义一个已存在的队列。

在这里插入图片描述

生产者

java">/*** 消息持久化*/
public class Provider01 {public static void main(String[] args) {try {System.out.println("--------生产者-------");// 获取连接Connection conn = ConnectionUtils.getConnection();//创建通道Channel channel = conn.createChannel();//同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 创建队列// 队列名称,是否持久化(队列),是否排他,是否自动删除,其他参数channel.queueDeclare("test4072", true, false, false, null);for (int i = 0; i < 50; i++) {// 定义发送信息String msg = "hello rabbitmq-kwh"+i;// 发送数据// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息channel.basicPublish("", "test4072", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());Thread.sleep(1000);}System.out.println("发送成功....");// 关闭资源channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}

http://www.ppmy.cn/embedded/149396.html

相关文章

自学记录HarmonyOS Next的HMS AI API 13:语音合成与语音识别

在完成图像处理项目后&#xff0c;我打算研究一下API 13的AI其中的——语音技术。HarmonyOS Next的最新API 13中&#xff0c;HMS AI Text-to-Speech和HMS AI Speech Recognizer提供了语音合成与语音识别的强大能力。 语音技术是现代智能设备的重要组成部分&#xff0c;从语音助…

【AI图像生成网站Golang】项目架构

AI图像生成网站 目录 一、项目介绍 二、雪花算法 三、JWT认证与令牌桶算法 四、项目架构 五、图床上传与图像生成API搭建 六、项目测试与优化 四、项目架构 本项目的后端基于Golang和Gin框架开发&#xff0c;主要包括的模块有&#xff1a; backend/ ├── controller…

【Rust自学】6.4. 简单的控制流-if let

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 6.4.1. 什么是if let if let语法允许将if和let组合成一种不太冗长的方式来处理与一种模式匹配的值&#xff0c;同时忽略其余模式。 可以…

【大数据毕设】基于机器学习的信用卡反欺诈系统设计与实现

代码大同小异&#xff0c;存档自留。下述代码均是构建一个信用卡反欺诈预测模型 代码参考 数据挖掘-11-利用python进行信用卡欺诈检测&#xff08;包含数据代码&#xff09; 机器学习项目实战之信用卡欺诈检测 python之逻辑回归项目实战——信用卡欺诈检测 机器学习实战分享…

Pytorch | 利用I-FGSSM针对CIFAR10上的ResNet分类器进行对抗攻击

Pytorch | 利用I-FGSSM针对CIFAR10上的ResNet分类器进行对抗攻击 CIFAR数据集I-FGSSM介绍I-FGSSM代码实现I-FGSSM算法实现攻击效果 代码汇总ifgssm.pytrain.pyadvtest.py 之前已经针对CIFAR10训练了多种分类器&#xff1a; Pytorch | 从零构建AlexNet对CIFAR10进行分类 Pytorch…

移动 APP 设计规范参考

一、界面设计规范 布局原则&#xff1a; 内容优先&#xff1a;以内容为核心进行布局&#xff0c;突出用户需要的信息&#xff0c;简化页面导航&#xff0c;提升屏幕空间利用率.一致性&#xff1a;保持界面元素风格一致&#xff0c;包括颜色、字体、图标等&#xff0c;使用户在…

linux笔记

VMnet0表示的是用于Bridged模式下的虚拟交换机。 VMnet1表示的是用于Host-Only模式下的虚拟交换机。 VMnet8表示的是用于NAT模式下的虚拟交换机。 VMware Network Adapter VMnet1&#xff1a;这是Host用于与Host-Only虚拟网络进行通信的虚拟网卡。 VMware Network Adapter …

SpringSecurity 3.0.2.2版本

前言&#xff1a; 实践&#xff0c;认知&#xff0c;再实践&#xff0c;再认知 回顾&#xff1a; 随着2.0&#xff0c;2.1的实践&#xff0c;我们找到了真正的需求 将默认用户改为自定义用户&#xff0c;而不是读取表单 正片&#xff1a; 随着官方文档的不断阅读&#xff0c…