消息投递:如何保证消息不丢失?

news/2025/2/13 22:37:28/

目录

前言

一、消息为什么会丢失?

二、在消息生产的过程中丢失消息

三、在消息队列中丢失消息

四、在消费的过程中存在消息丢失的可能


前言

在电商系统中,我们经常有这样的场景,在用户下单购买完商品后,需要给用户发送红包来促进用户继续消费。而发送红包的流程并不在下单的核心流程之内,这样的流程我们通常会采用消息队列的方式来异步处理,提高接口性能。

这时,你发现了一个问题:如果消息在投递的过程中发生丢失,那么用户就会因为没有得到红包而投诉。相反,如果消息在投递的过程中出现了重复,那么你的系统就会因为发送两个红包而损失。

那么我们如何保证,产生的消息一定会被消费到,并且只被消费一次呢?


一、消息为什么会丢失

如果要保证消息只被消费一次,首先就要保证消息不会丢失。那么消息从被写入到消息队列,到被消费者消费完成,这个链路上会有哪些地方存在丢失消息的可能呢?其实,主要存在三个场景:

  1. 消息从生产者写入到消息队列的过程。
  2. 消息在消息队列中的存储场景。
  3. 消息被消费者消费的过程。

接下来,我就针对每一个场景,详细地剖析一下,这样你可以针对不同的场景选择合适的,减少消息丢失的解决方案。

二、在消息生产的过程中丢失消息

在这个环节中主要有两种情况。

首先,消息的生产者一般是我们的业务服务器,消息队列是独立部署在单独的服务器上的。两者之间的网络虽然是内网,但是也会存在抖动的可能,而一旦发生抖动,消息就有可能因为网络的错误而丢失。

针对这种情况,我建议你采用的方案是消息重传:也就是当你发现发送超时后你就将消息重新发一次,但是你也不能无限制地重传消息。一般来说,如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。不过,这种方案可能会造成消息的重复,从而导致在消费的时候会重复消费同样的消息。比方说,消息生产时由于消息队列处理慢或者网络的抖动,导致虽然最终写入消息队列成功,但在生产端却超时了,生产者重传这条消息就会形成重复的消息,那么针对上面的例子,直观显示在你面前的就会是你收到了两个现金红包。

spring:# kafka 配置kafka:bootstrap-servers: 127.0.0.1:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# 消费请求超时时间request:timeout:ms: 20000batch:#批量大小size: 1024#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。#可以设置的值为:all, -1, 0, 1acks: 1#重试次数retries: 2

kafka中配置重试次数 spring.kafka.producer.retries = 2

那么消息发送到了消息队列之后是否就万无一失了呢?当然不是,在消息队列中消息仍然有丢失的风险。

三、在消息队列中丢失消息

拿 Kafka 举例,消息在 Kafka 中是存储在本地磁盘上的,而为了减少消息存储时对磁盘的随机 I/O,我们一般会将消息先写入到操作系统的 Page Cache 中,然后再找合适的时机刷新到磁盘上。

比如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是所说的异步刷盘。

不过,如果发生机器掉电或者机器异常重启,那么 Page Cache 中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?

你可能会把刷盘的间隔设置很短,或者设置累积一条消息就就刷盘,但这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高,所以我不建议你这样做。

如果你的电商系统对消息丢失的容忍度很低,那么你可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。那么它是怎么实现的呢?

Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。

由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。从上面这张图来看,当设置“acks=all”时,需要同步执行 1,3,4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。

spring:# kafka 配置kafka:bootstrap-servers: 127.0.0.1:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。#可以设置的值为:all, -1, 0, 1acks: 1
  1. 如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。
  2. 如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。
  3. 我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。

四、在消费的过程中存在消息丢失的可能

我还是以 Kafka 为例来说明。一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如说,消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,那么这条失败的消息就永远不会被处理了,也可以认为是丢失了。

所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。

spring:kafka:consumer:group-id: launch-mq-prod# 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offsetenable-auto-commit: false


http://www.ppmy.cn/news/1196839.html

相关文章

Spring Boot 整合SpringSecurity和JWT和Redis实现统一鉴权认证

📑前言 本文主要讲了Spring Security文章,如果有什么需要改进的地方还请大佬指出⛺️ 🎬作者简介:大家好,我是青衿🥇 ☁️博客首页:CSDN主页放风讲故事 🌄每日一句:努力…

antv/g6元素之combo

介绍 在 G6 中,“Combo” 是一种特殊的元素,用于组合和展示多个节点元素的一种方式。它通常用于表示一个组或子图,将多个相关节点组织在一起,并在图形中以单一的形状显示。 属性 type:Combo 的类型,通常是…

订单管理功能实现B

<template><el-card><el-row :gutter="20" class="header"><el-col :span="7"><el-input placeholder="请输入订单号..." v-model="queryForm.query" clearable></el-input></el-co…

全新干货!一招教你迅速提升流量主收入!包你轻松月入过万

也不怕大家笑话&#xff0c;才哥以前收入每天才一块钱&#xff0c;连瓶水都买不了&#xff0c; 可是自从我开始接触老年粉私域后&#xff0c;一个搬运公众号的流量主收益两个月后就可以用“浴火重生”来形容了。 一个搬运公众号一天的流量主收益比我原创两年的个人公众号收益还…

【多线程】并发问题

public class BuyTicket implements Runnable{private int ticketNums10;Overridepublic void run() {for(int i1;i<ticketNums;i){if(ticketNums<0){break;}System.out.println(Thread.currentThread().getName() "抢到了第" i "张票");ticketNu…

Building wheel for scipy (setup.py) ...卡着

【Q】:安装torch等的时候遇到问题&#xff1a; 那就安装这些库呗&#xff0c;执行“python -m pip install matplotlib2.0.0 scipy1.0.1 tifffile2019.7.26” 结果出现下列情况&#xff1a;&#xff08;xx的魔力转圈圈&#xff09; 再次执行&#xff1a;python -m pip instal…

【从0到1设计一个网关】如何设计一个稳定的网关?

文章目录 高可用分析软件架构心跳检测自动恢复熔断降级接口重试隔离压测和预案多机房灾备以及双活数据中心异常处理机制重试主备服务自动切换动态剔除或恢复异常机器超时时间的考虑服务设计这篇文章并没有具体的业务实现,而只是对于如何设计一个高可用,稳定的网关列举出了一些…

【小白专用】微信小程序个人中心、我的界面(示例一)23.11.04

微信小程序使用button按钮实现个人中心、我的界面&#xff08;示例一&#xff09; 微信小程序个人中心、我的界面&#xff0c;使用button按钮实现界面布局&#xff0c;更好的将分享好友、获取头像等功能展现出来&#xff0c;更多示例界面&#xff0c;请前往我的主页哦。 1、js…