15.Kafka系列之事务原理及实践

news/2024/10/29 4:24:17/

我们先来回顾下6.Kafka系列之设计思想(四)-消息传递语义中的一些内容

1. 消息传递保证

  • At most once:最多一次。消息可能会丢失,但永远不会重新传递
  • At least once:至少一次。消息永远不会丢失,但可能会重新传递
  • Exactly once:这正是人们真正想要的,每条消息只传递一次
1.1 发布消息的持久性保证

1.发布消息时,我们有消息被“提交”到日志的概念。一旦发布的消息被提交,只要复制该消息写入的分区的一个代理保持“活动”状态,它就不会丢失。如果生产者尝试发布消息并遇到网络错误,则无法确定此错误是发生在消息提交之前还是之后。在 0.11.0.0 之前,如果生产者未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已经成功,则消息可能会在重新发送期间再次写入日志
2.从 0.11.0.0 开始,Kafka 生产者还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目​​。为此,代理为每个生产者分配一个 ID,并使用生产者随每条消息发送的序列号对消息进行重复数据删除
3.同样从 0.11.0.0 开始,生产者支持使用类似事务的语义将消息发送到多个主题分区的能力:即 要么所有消息都已成功写入,要么都没有

1.2 消费消息时的保证

1.它可以读取消息,然后保存它在日志中的位置,最后处理消息。在这种情况下,消费者进程有可能在保存其位置之后但在保存其消息处理的输出之前崩溃。在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息还没有被处理。这对应于“至多一次”语义,因为在消费者失败消息的情况下可能不会被处理
2.它可以读取消息、处理消息并最终保存其位置。在这种情况下,消费者进程有可能在处理消息之后但在保存其位置之前崩溃。在这种情况下,当新进程接管它接收到的前几条消息时,它已经被处理过了。这对应于消费者失败情况下的“至少一次”语义。在许多情况下,消息有一个主键,因此更新是幂等的(两次接收相同的消息只是用它自己的另一个副本覆盖记录)
3.那么 exactly once 语义(即你真正想要的东西)呢?从 Kafka 主题消费并生产到另一个主题时(如Kafka Streams 应用程序),我们可以利用上面提到的 0.11.0.0 中的新事务生产者功能。消费者的位置作为消息存储在主题中,因此我们可以在与接收处理数据的输出主题相同的事务中将偏移量写入 Kafka。如果交易被中止,消费者的位置将恢复到它的旧值,并且输出主题的产生的数据将对其他消费者不可见,这取决于他们的“隔离级别”。在默认的“read_uncommitted”隔离级别中,所有消息对消费者都是可见的,即使它们是中止事务的一部分,但在“read_committed”中,消费者将只返回来自已提交事务的消息(以及任何不属于一部分的消息)交易)

2. Kafka幂等性

我们在客户端参数显示设置enable.idempotence=true就会开启生产者幂等消息传递

  • 每个新的生产者实例在初始化时会被分配一个PID(producer id)
  • 对于每个PID,消息发送到的每一个分区都有对应的序列号,序列号从0开始单调递增,生产者每发送一条消息就会将<PID,分区>对应的序列号值加1
  • broker端会在内存中为每一对<PID,分区>维护一个序列号,对于收到的每一条消息,只有当它的序列号的值(SN_new)正好比broker端中维护的对应序列号的值(SN_old)大1,broker才会接收该消息。如果 SN_new < SN_old + 1,说明消息被重复写入,broker会将该消息丢弃。否则,说明中间有数据尚未写入,暗示可能有消息丢失,对应生产者会抛出 OutOfOrderSequenceException 异常

注意:序列号实现幂等只是针对每一对<PID,分区>,即Kafka的幂等性只能保证单个生产者会话(session)中单分区的幂等

3. Kafka事务

通过事务可以弥补幂等性不能跨多个分区的缺陷,且可以保证对多个分区写入操作的原子性

3.1 创建主题与查看分区
kafka-topics.sh --create --topic my-topic --partitions 10 --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

3.2 编写KafkaTransactionDemo代码
/*** Kafka事务DEMO** @author shenjian* @since 2023/5/27*/
public class KafkaTransactionDemo {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:30092");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");// 开启幂等传递props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close();}
}
3.3 运行并消费结果
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092

# --time -1 表示获取的最新位移值
# --time -2 表示获取的最早的位移值,可能由于最早的数据由于过期被删除,所以最早的位移不一定是0
# 通过两数相减,就可以知道当前分区的数据条数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic  --time -2
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic  --time -1

不放心的小伙伴可以去统计要总数量是否一致奥

通过事务,从生产者角度,Kafka可以保证:

  • 跨生产者会话的消息幂等发送: transactionId与PID一一对应,如果新的生产者启动,具有相同transactionId的旧生产者会立即失效(每个生产者通过 transactionId获取PID的同时,还会获取一个单调递增的 producer epoch)
  • 跨生产者会话的事务恢复: 当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事物要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作(通过 producer epoch判断)

从消费者角度,事务能保证的语义相对偏弱,对于一些特殊的情况,Kafka并不能保证已提交的事务中的所有消息都能被消费:

  • 对采用日志压缩策略的主题,事务中的某些消息可能被清理(相同key的消息,后写入的消息会覆盖前面写入的消息)
  • 事务中消息可能分布在同一个分区的多个日志分段(LogSegment)中,当老的日志分段被删除时,对应的消息可能会丢失

4.Kafka事务实现原理

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator 负责实施的
broker节点有一个专门管理事务的内部主题 __transaction_state,TransactionCoodinator 会将事务状态持久化到该主题中

  1. 查找 TransactionCoordinator:生产者会先向某个broker发送 FindCoordinator 请求,找到 TransactionCoordinator 所在的 broker节点
  2. 获取PID:生产者会向 TransactionCoordinator 申请获取 PID,TransactionCoordinator 收到请求后,会把 transactionalId 和对应的 PID 以消息的形式保存到主题__transaction_state 中,保证 <transaction_Id,PID>的对应关系被持久化,即使宕机该对应关系也不会丢失
  3. 开启事务:调用 beginTransaction()后,生产者本地会标记开启了一个新事务
  4. 发送消息:生产者向用户主题发送消息,过程跟普通消息相同,但第一次发送请求前会先发送请求给TransactionCoordinator 将 transactionalId 和 TopicPartition 的对应关系存储在 __transaction_state 中
  5. 提交或中止事务:Kafka除了普通消息,还有专门的控制消息(ControlBatch)来标志一个事务的结束,控制消息有两种类型,分别用来表征事务的提交和中止。该阶段本质就是一个两阶段提交过程:
  • 将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state
  • 将COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets
  • 将 COMPLETE_COMMIT 或 COMPLETE_COMMIT_ABORT 消息写入主题 __transaction_state

如此一来,表面当前事务已经结束,此时就可以删除主题 __transaction_state 中所有关于该事务的消息

欢迎关注公众号算法小生


http://www.ppmy.cn/news/97724.html

相关文章

微服务设计原则--笔记

微服务设计原则–笔记 单一职责原则 单一职责原则指的是一个单元&#xff08;类、方法或者服务等&#xff09;只应关注系统功能中单独、有界限的一部分。单一职责原则可以帮助我们优雅的开发、敏捷的交付。单一职责也是SOLID原则之一。 接口隔离原则 服务之间的交互应该基于…

定积分的计算(换元法)

前置知识&#xff1a; 第一类换元法&#xff08;凑微分法&#xff09;第二类换元法牛顿-莱布尼茨公式 定积分换元法 设 f f f在 [ a , b ] [a,b] [a,b]上连续&#xff0c; φ \varphi φ在 [ α , β ] [\alpha,\beta] [α,β]上可导且导数连续&#xff0c; x φ ( t ) x\v…

基于html+css的图展示96

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

3ds MAX 基本体建模,长方体、圆柱体和球体

3ds MAX基本页面如下&#xff1a; 生成新的几何体在右侧&#xff1a; 选择生成的对象类型即可&#xff0c;以下为例子&#xff1a; 1、长方体建模 选择建立的对象类型为长方形 在 任意一个窗口绘制&#xff0c;鼠标滑动 这里选择左上角的俯视图 松开鼠标后&#xff0c;可以…

【Docker】什么是Dockerfile

&#x1f333;&#x1f333;【Docer篇整理】&#x1f333;&#x1f333; 篇一&#xff1a;docker核心概念与常用指令 篇二&#xff1a;镜像与docker数据卷 篇三&#xff1a;dockerfile 篇四&#xff1a;docker网络 文章目录 1、认识DockerFile2、DockerFile的构建过程3、Docke…

第二章:ShardingSphere简介

什么是ShardingSphere 何为ShardingSphere呢?其实我们总结如下三点就能很好的理解: 1、一整套开源的分布式数据库中间件解决方案 2、有三个产品组成:Sharding-JDBC、Sharding-Proxy、Sharding-Sidecar(规划中) 3、他的定位是关系型数据库的中间件,在分布式环境下合理的…

关于云计算和raid技术的对比

云计算和RAID技术是两个不同领域的概念&#xff0c;但它们可以在存储和数据保护方面进行对比。 云计算&#xff08;Cloud Computing&#xff09;是指通过网络将计算资源、存储资源和应用程序提供给用户的一种计算模式。它基于虚拟化技术&#xff0c;通过互联网实现按需访问和使…

Web基础 ( 六 ) AJAX

4.6.AJAX 4.6.1.什么是ajax Ajax&#xff08;Asynchronous JavaScript and XML , Asynchronous 异步的&#xff09;指的是一种使用 JavaScript、XML 和 HTTP 请求进行前端数据异步交互的技术。Ajax 不需要刷新整个页面就可以更新其中的一部分&#xff0c;使得网页的反应更快、…