MQ如何保证消息不丢失?

news/2025/2/21 8:44:15/

1.mq原则
MQ传输过程中,消息数据不能多,也不能少,不能多是说消息不能重复消费,这个我们下一章解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的,本章详细介绍不能少的问题。

2.丢失数据场景
丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景,
(1)rabbitmq
A:生产者弄丢了数据
生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rabbitmq自己丢了数据
如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据
主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。


(2)kafka
A:生产者弄丢了数据
生产者没有设置相应的策略,发送过程中丢失数据。
B:kafka弄丢了数据
比较常见的一个场景,就是kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,他就少了一部分数据。
C:消费者弄丢了数据
消费者消费到了这个数据,然后消费后自动提交了offset,让kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。

3.如何防止消息丢失
(1)rabbitmq
A:生产者丢失消息
①:可以选择使用rabbitmq提供事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。

  channel.txSelect();//开启事物
  try{
      //发送消息
  }catch(Exection e){
      channel.txRollback();//回滚事物
      //重新提交
  }
缺点:rabbitmq事物已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。

②:可以开启confirm模式。在生产者那里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

    //开启confirm
    channel.confirm();
    //发送成功回调
    public void ack(String messageId){
      
    }
 
    // 发送失败回调
    public void nack(String messageId){
        //重发该消息
    }
二者不同
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。
一般在生产者这块避免丢失,都是用confirm机制。
B:rabbitmq自己弄丢了数据
设置消息持久化到磁盘。设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候将消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。
必须要同时开启这两个才可以。

而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据
使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

(2)kafka
A:消费端弄丢了数据
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
B:kafka弄丢了数据
一般要求设置4个参数来保证消息不丢失:
①给topic设置 replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。

②在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。

③在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了

④在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事败,就无限重试
C:生产者弄丢了数据
如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
 


http://www.ppmy.cn/news/22655.html

相关文章

openCL笔记【更新中】

文章目录概述OpenCL标准平台模型执行模型Quick Start1. 搜索并选择OpenCL平台2. 获得OpenCL设备;opencl 设备类型3. 创建上下文两个函数用于创建上下文OpenCL命令队列4. 程序对象创建内核对象创建内存对象使用内存对象对内核对象进行参数传递获取结果5. 资源回收Ope…

微信小程序--》从零实现小程序项目案例

🏍️作者简介:大家好,我是亦世凡华、渴望知识储备自己的一名在校大学生 🛵个人主页:亦世凡华、 🛺系列专栏:微信小程序 🚲座右铭:人生亦可燃烧,亦可腐败&…

ROS消息自定义、调用及问题解决

1、自定义话题信息 1、创建工作空间及功能包 mkdir catkin_ws cd src catkin_create_pkg test_msgs roscpp rospy std_msgs2、创建msg文件夹及文件 cd test_msgs mkdir msg cd msg vim test.msg3、添加内容 eg: string name float32 percent uint32 time4、修改pcckage中的…

【Linux】线程安全(万字详解)

🎇Linux: 博客主页:一起去看日落吗分享博主的在Linux中学习到的知识和遇到的问题博主的能力有限,出现错误希望大家不吝赐教分享给大家一句我很喜欢的话: 看似不起波澜的日复一日,一定会在某一天让你看见坚持…

深度学习算法工程师——CV、NLP面试经验汇总

面试流程 面试的一般流程: https://github.com/amusi/AI-Job 资料参考 林轩田视频(机器学习基石、机器学习技法) 李航《统计学习方法》 CS231n 剑指Offer LeetCode 算法理论基础知识应知应会 : https://github.com/sladesha/R…

【C++】C++11语法 ~ lambda 表达式

🌈欢迎来到C专栏~~ lambda 表达式 (꒪ꇴ꒪(꒪ꇴ꒪ )🐣,我是Scort目前状态:大三非科班啃C中🌍博客主页:张小姐的猫~江湖背景快上车🚘,握好方向盘跟我有一起打天下嘞!送给自己的一句鸡…

实时即未来,大数据项目车联网之电子围栏分析任务设置【十九】

文章目录 1. 电子围栏分析任务设置1.1 电子围栏分析任务步骤分析1.2 电子围栏分析任务实现1.3 广播状态与实现1. 电子围栏分析任务设置 1.1 电子围栏分析任务步骤分析 电子围栏任务主要有8大步骤: 电子围栏分析任务设置、原始数据json解析、过滤异常数据 读取已存在电子围栏中…

一个简易的前端业务工具库【tj-jstools】

简介 工作两年多了,最近自己总结用 TypeScript 写了一个简易的前端业务工具库: tj-jstools 。有以下功能: 判断js数据类型、获取一个变量或者值的具体类型手机号脱敏、数值千分位、金额汉字大写转换数组和tree形数据互转、url参数获取并组合…