MQ 架构设计原理与消息中间件详解(二)

news/2024/10/10 5:37:24/

### 一、RabbitMQ 如何保证消息不丢失?

消息不丢失可以分为三个方面进行保障:**生产者投递消息**、**消费者消费消息** 和 **MQ 服务器持久化**。

#### 1.1 生产者角色的消息确认机制

RabbitMQ 提供了两种方式来确保生产者投递的消息能够被成功接收:
1. **消息确认机制(Confirms)**:
   - 生产者在投递消息后,RabbitMQ 服务器会返回确认消息。生产者可以选择同步或异步接收确认消息,确保消息已被 MQ 服务器成功接收。
   
   例如,生产者投递消息并等待 RabbitMQ 服务器的确认:
 

   channel.confirmSelect();  // 开启确认机制channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());boolean result = channel.waitForConfirms();  // 等待确认if (result) {System.out.println("消息投递成功");}

2. **事务消息**:
   - 生产者可以通过事务来确保消息的投递可靠性。在事务模式下,RabbitMQ 保证消息要么全部成功投递,要么不投递。

   代码示例:
 

   channel.txSelect();  // 开启事务channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());channel.txCommit();  // 提交事务

#### 1.2 消费者角色的消息确认机制

在 RabbitMQ 中,消费者需要手动确认消息已经成功消费,只有当 RabbitMQ 接收到消费者的确认(ACK)后,消息才会从队列中移除。如果没有接收到确认,RabbitMQ 会重新投递该消息,确保不会丢失。

手动确认消息示例:
 

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);  // 关闭自动应答
channel.basicAck(envelope.getDeliveryTag(), false);  // 手动确认消息已处理完毕

#### 1.3 RabbitMQ 服务器端的消息持久化

RabbitMQ 默认会将队列中的消息持久化到硬盘中,以确保系统宕机时消息不会丢失。生产者在创建队列时可以设置 `durable=true` 来确保消息持久化。

channel.queueDeclare("durable_queue", true, false, false, null);  // durable 设置为 true

持久化机制可以有效防止服务器端故障导致消息丢失。

---

### 二、如何通过代码实现消息可靠性?

下面我们通过代码示例展示如何实现生产者与消费者的消息确认机制和持久化设置。

#### 2.1 生产者代码示例

生产者通过开启 **Confirm机制** 来确保消息被成功投递到 RabbitMQ 服务器端。生产者在发送消息后等待服务器的确认,若消息投递失败,生产者可以重新发送该消息。```java

public class Producer {private static final String QUEUE_NAME = "reliable_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 开启消息确认机制channel.confirmSelect();// 发送消息String message = "Reliable Message!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());// 等待服务器确认消息是否投递成功boolean result = channel.waitForConfirms();if (result) {System.out.println("消息投递成功");} else {System.out.println("消息投递失败");}channel.close();connection.close();}
}


```

#### 2.2 消费者代码示例

消费者使用手动消息确认机制,确保每条消息在成功处理后才从队列中移除,避免消息丢失。

public class Consumer {private static final String QUEUE_NAME = "reliable_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 设置手动确认模式channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费消息:" + message);// 手动确认消息channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

---

### 三、RabbitMQ 五种消息模式

RabbitMQ 提供了多种消息模式,用户可以根据业务场景选择合适的消息分发方式。

#### 3.1 工作队列模式


- **工作队列模式** 主要用于多个消费者并行处理任务的场景。通过设置 `basicQos`,可以控制每次只发送一条消息给消费者,避免消息不均匀的分发。
 

channel.basicQos(1);

#### 3.2 交换机类型


- **Direct exchange(直连交换机)**:根据完全匹配的路由键将消息分发到指定队列。
- **Fanout exchange(扇型交换机)**:将消息广播到所有绑定到交换机的队列中。
- **Topic exchange(主题交换机)**:根据模式匹配的路由键进行消息分发。
- **Headers exchange(头交换机)**:通过消息头部的键值对来路由消息。

#### 3.3 发布订阅模式(Fanout Exchange)

在发布订阅模式中,生产者将消息发布到交换机中,交换机会将消息广播到所有绑定的队列上,多个消费者可以并行处理同一消息。生产者代码示例:

public class ProducerFanout {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();// 声明扇型交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 发送消息String message = "Fanout Message!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());channel.close();connection.close();}
}

### 四、Spring Boot 整合 RabbitMQ

在 Spring Boot 中,整合 RabbitMQ 变得非常简单,通过少量的配置和注解即可完成消息队列的集成与使用。下面介绍如何使用 Spring Boot 来整合 RabbitMQ,并提供完整的代码示例。

#### 4.1 Maven 依赖

首先,在 `pom.xml` 文件中添加必要的依赖:

<dependencies><!-- Spring Boot AMQP (RabbitMQ) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- JSON 解析工具 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Lombok (可选,简化代码) --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

#### 4.2 配置 RabbitMQ

创建一个 `RabbitMQConfig` 配置类,定义交换机、队列及它们之间的绑定关系。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 定义扇型交换机名称private static final String EXCHANGE_NAME = "fanout_exchange";// 短信队列private static final String SMS_QUEUE = "fanout_sms_queue";// 邮件队列private static final String EMAIL_QUEUE = "fanout_email_queue";/*** 定义短信队列*/@Beanpublic Queue smsQueue() {return new Queue(SMS_QUEUE);}/*** 定义邮件队列*/@Beanpublic Queue emailQueue() {return new Queue(EMAIL_QUEUE);}/*** 定义扇型交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(EXCHANGE_NAME);}/*** 绑定短信队列到扇型交换机*/@Beanpublic Binding bindingSmsQueue(Queue smsQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(smsQueue).to(fanoutExchange);}/*** 绑定邮件队列到扇型交换机*/@Beanpublic Binding bindingEmailQueue(Queue emailQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(emailQueue).to(fanoutExchange);}
}

#### 4.3 配置文件 `application.yml`

接下来,需要在 `application.yml` 文件中配置 RabbitMQ 连接信息:

spring:rabbitmq:host: 127.0.0.1        # RabbitMQ 服务器地址port: 5672             # RabbitMQ 端口号username: guest        # RabbitMQ 用户名password: guest        # RabbitMQ 密码virtual-host: /        # RabbitMQ 虚拟主机

#### 4.4 生产者代码

生产者通过 `AmqpTemplate` 将消息发送到指定的交换机:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 发送消息到 RabbitMQ* @param msg 消息内容* @return 操作结果*/@RequestMapping("/sendMsg")public String sendMsg(String msg) {// 将消息发送到指定的交换机amqpTemplate.convertAndSend("fanout_exchange", "", msg);return "消息发送成功";}
}

#### 4.5 消费者代码

短信消费者和邮件消费者分别监听各自的队列,并处理接收到的消息。##### 短信消费者:
 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RabbitListener(queues = "fanout_sms_queue")  // 监听短信队列
public class FanoutSmsConsumer {@RabbitHandlerpublic void process(String msg) {log.info(">> 短信消费者接收到消息: {} <<", msg);}
}

##### 邮件消费者:
 

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RabbitListener(queues = "fanout_email_queue")  // 监听邮件队列
public class FanoutEmailConsumer {@RabbitHandlerpublic void process(String msg) {log.info(">> 邮件消费者接收到消息: {} <<", msg);}
}

#### 4.6 测试消息发送

启动 Spring Boot 应用后,可以通过访问浏览器或 Postman 调用生产者的 `/sendMsg` 接口,发送消息到 RabbitMQ。

- 示例请求:

http://localhost:8080/sendMsg?msg=Hello RabbitMQ!

生产者会将消息广播到所有绑定到交换机的队列,短信消费者和邮件消费者将分别接收到并处理消息。

---

### 五、总结

通过本文,我们讲解了如何使用 Spring Boot 集成 RabbitMQ,详细展示了如何配置交换机、队列和生产者/消费者,并通过扇型交换机的发布/订阅模式实现了消息广播。RabbitMQ 与 Spring Boot 的集成非常简便且高效,能够帮助我们轻松实现消息队列的功能,确保消息的可靠传递。

希望这篇文章能帮助你更好地理解并掌握 RabbitMQ 的消息可靠性机制,并能成功将这些技术应用到你的项目中。


http://www.ppmy.cn/news/1536884.html

相关文章

MAC端VSCode code-runner插件配置 c/c++编译后 文件生成路径

请注意该配置为MAC系统上的。 window系统可以参考此方法去设置 VS Code 版本: 1.94.0 Code Runner 版本: v0.12.2 在VS Code 配置文件(settings.json)中添加/修改以下配置项。 { ..."code-runner.executorMap": {"c": "cd $dir && mkdir -…

24.2.29蓝桥杯|单位换算--8道题

本篇或者本系列文章使用蓝桥云课平台&#xff0c;借助CSDN梳理思路&#xff0c;给自己做一个电子笔记 单位换算类题目注意事项&#xff1a; 在参加蓝桥杯等编程竞赛时&#xff0c;进行单位换算是一个常见的题目类型&#xff0c;特别是涉及到数据存储和传输的问题。在处理单位换…

List的实现类

1.ArrayList&#xff08;数组&#xff09; &#xff08;1&#xff09;代码 新建学生类&#xff1a; package com.collection;public class Student {private String name;private int age;//添加构造方法 都是使用altenter快捷键public Student() {this.name name;this.age…

云原生、云计算、虚拟化概念概述

&#xff08;带着批评阅读&#xff0c;不对的请评论区补充&#xff09; 1、出现年代前后顺序 虚拟化------>云计算------>云原生 2、虚拟化 虚拟化侧重描述实现&#xff0c;开始主要是模拟、hook指令执行软件程序&#xff0c;后续出现了半虚拟化、硬件也开始有虚拟化指…

MySQL(SQLite3)数据库+Flask框架+HTML搭建个人博客网站

搭建一个博客并实现文章的发送功能&#xff0c;涉及到前后端开发和数据库操作。以下是一个简单的示例&#xff0c;使用Python的Flask框架作为后端&#xff0c;HTML和JavaScript作为前端&#xff0c;SQLite作为数据库。 1. 项目结构 my_blog/ │ ├── app.py ├── templat…

24.数据结构和算法-哈夫曼树及其应用(最优二叉树)

哈夫曼树的基本概念 哈夫曼树的构造算法 哈夫曼树构造算法的实现 理论分析 具体实现 哈夫曼编码 哈夫曼编码的性质 例题 哈夫曼编码的算法实现 哈夫曼编码的应用 文件的编码和解码

【力扣 | SQL题 | 每日四题】力扣2082, 2084, 2072, 2112, 180

四题都比较简单&#xff0c;可以直接秒。 1. 力扣2082&#xff1a;富有客户的数量 1.1 题目: 表&#xff1a; Store ------------------- | Column Name | Type | ------------------- | bill_id | int | | customer_id | int | | amount | int | -------------…

Go语言实现长连接并发框架 - 任务管理器

文章目录 前言接口结构体接口实现项目地址最后 前言 你好&#xff0c;我是醉墨居士&#xff0c;我们上篇博客实现了路由分组的功能&#xff0c;接下来这篇博客我们将要实现任务管理模块 接口 trait/task_mgr.go type TaskMgr interface {RouterGroupStart()StartWorker(tas…