RabbitMQ消息的可靠传输和防止消息丢失

devtools/2024/11/13 9:39:54/

在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

  1. 消息持久化:确保消息在RabbitMQ中持久化。
  2. 队列持久化:确保队列是持久化的。
  3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
  4. 消费者确认:确保消费者正确地确认消息。
  5. 重试机制:在消息消费失败时,设置重试机制。

下面详细介绍如何实现这些措施:

1. 添加依赖

确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 配置RabbitMQ

application.ymlapplication.properties文件中配置RabbitMQ:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: true

3. 定义配置类

创建一个配置类来配置队列、交换机和绑定:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "myQueue";public static final String EXCHANGE_NAME = "myExchange";public static final String ROUTING_KEY = "myRoutingKey";@Beanpublic Queue myQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic DirectExchange myExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Binding myBinding(Queue myQueue, DirectExchange myExchange) {return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);}
}

4. 配置消息生产者

确保消息生产者配置了发布确认和消息持久化:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 设置发布确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("Message delivered successfully: " + correlationData);} else {System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);}}});// 设置消息返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("Returned Message: " + new String(message.getBody()) +", replyCode: " + replyCode + ", replyText: " + replyText +", exchange: " + exchange + ", routingKey: " + routingKey);});}public void sendMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);}
}

5. 配置消息消费者

确保消息消费者配置了消息确认机制:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void handleMessage(String message, Channel channel, Message message) throws Exception {try {// 处理消息System.out.println("Received Message: " + message);// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 消费失败,重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

6. 启用重试机制

在Spring Cloud Stream中启用重试机制:

spring:cloud:stream:bindings:input:destination: myQueueconsumer:retry:max-attempts: 5backOffPolicy:initialInterval: 1000multiplier: 2.0maxInterval: 10000

7. 测试

测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

总结

通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

  1. 消息和队列的持久化:确保消息和队列都是持久化的。
  2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
  3. 消费者确认:确保消费者正确地确认消息。
  4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。


http://www.ppmy.cn/devtools/52216.html

相关文章

Java中List流式转换为Map的终极指南

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 在Java编程中&#xff0c;经常需要将一个List对象转换为另一个Map对象。这可能是因为需要根据List中的元素的某些属性来创建一个新的键值对集合。在本文中&#xff0c;我将向您展示如何使用Java 中的流式API轻松地实…

vue2动态路由实现

实现一个简单的动态路由&#xff1a; 1、先定义菜单页面组件的结构&#xff0c;使用的是elementUI的NavMenu 导航菜单 <template><div><el-menu default-active"1" router><el-submenu :index"item.path" v-for"item in menu_…

敏捷测试:方法和实践

敏捷测试&#xff1a;方法和实践 前言1. 敏捷团队组织构成与任务2. 测试驱动开发&#xff08;TDD&#xff09;3. 递增型迭代测试模型4. 静态测试的重要性5. 测试计划与管理6. 敏捷测试活动时间表结论 前言 敏捷测试的实践是将敏捷开发原则和方法运用到测试过程中&#xff0c;以…

面试计算机网络八股文十问十答第十期

面试计算机网络八股文十问十答第十期 作者&#xff1a;程序员小白条&#xff0c;个人博客 相信看了本文后&#xff0c;对你的面试是有一定帮助的&#xff01;关注专栏后就能收到持续更新&#xff01; ⭐点赞⭐收藏⭐不迷路&#xff01;⭐ 1&#xff09;SACK 的引入是为了解决…

2024年6月四六级考试复盘

一、考试情况 1.1四级考试情况 听力&#xff1a;一开始没有进入状态。总共对了9道。7.1*37.1*314.2*3 8.2 新闻听力&#xff1a;3/7 长对话&#xff1a;3/8 讲座/讲话&#xff1a;3/10 阅读&#xff1a;3.55*7 7.1*8 14.2 * 7 181.05 选词填空&#xff1a;保守估计7/1…

9种编程语言的对比分析

在当今的软件开发领域&#xff0c;编程语言扮演着至关重要的角色。不同的编程语言各有其特点和适用场景&#xff0c;选择合适的编程语言能够提高开发效率和软件质量。本文将对十种常见的编程语言进行对比分析&#xff0c;帮助读者了解它们的优缺点和适用场景。 Java 特点&…

Selenium 保存会话信息避免重复登录实战!

前言 • 在一些实际开发场景中&#xff0c;我们在使用 Selenium 做自动化测试时需要保留用户的会话信息&#xff0c;从而避免重复登录&#xff0c;今天这篇文章就带大家实战如何使用 Selenium 保存会话信息。 版本 • Python 3.x 整体思路 • 当我们打开页面时&#xff0c;…

第十章:MongoDB

MongoDB 文章目录 MongoDB一、简介1.1 MongoDB 是什么1.2 数据库是什么1.3 数据库的作用1.4 数据库管理数据的特点1.5 为什么选择 MongoDB 二、核心概念三、下载安装与启动&#xff08;1&#xff09;配置步骤如下&#xff1a;&#xff08;2&#xff09;启动服务&#xff1a; 四…