项目实战总结-Kafka实战应用核心要点

news/2024/12/22 20:01:06/

Kafka实战应用核心要点

  • 一、前言
  • 二、Kafka避免重复消费
    • 2.1 消费者组机制
    • 2.2 幂等生产者
    • 2.3 事务性生产者/消费者
    • 2.4 手动提交偏移量
    • 2.5 外部存储管理偏移量
    • 2.6 去重逻辑
    • 2.7 幂等消息处理逻辑
    • 2.8 小结
  • 三、Kafka持久化策略
    • 3.1 持久化文件
    • 3.2 segment 分段策略
    • 3.3 数据文件刷盘策略
    • 3.4 日志清理策略
    • 3.5 Kafka消息查找策略
  • 四、Kafka零复制(Zero-copy)
  • 五、Kafka设计实现延迟消息
  • 六、Kafka与ZooKeeper依赖性

一、前言

在这里插入图片描述
记录Kafka在项目中应用的核心要点,面试可食用。

二、Kafka避免重复消费

在 Apache Kafka 应用于项目中时,避免重复消费是个重要且常见的问题,尤其是在处理消息时需要确保每条消息只被处理一次。总结而言,避免重复消费的方式有七种:

2.1 消费者组机制

Kafka消费者组(Consumer Group)机制可以确保每个分区的消息只被一个消费者实例消费。通过合理的分区和消费者组设计,可以避免同一消息被多个消费者重复消费。
优点:

  • 简单易用,Kafka内置支持。
  • 适用于简单的负载均衡和扩展。

缺点:

  • 不能完全避免重复消费,比如在消费者重启或重新平衡的过程中可能会有些消息被重复消费。
  • 需要额外处理消费者重平衡带来的复杂性。

2.2 幂等生产者

Kafka 0.11.0版本引入幂等生产者(Idempotent Producer),可确保相同的消息在网络或其他错误导致重试时不会被重复写入Kafka。
启用幂等生产者只需要在生产者配置中设置enable.idempotence=true。幂等生产者确保消息在网络或其他错误导致重试时不会被重复写入 Kafka,通过为每个消息分配唯一的序列号来实现幂等性。
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

优点:

  • 简化生产者端的去重逻辑。
  • 可以确保消息在Kafka中只写入一次。

缺点:

  • 需要Kafka 0.11.0及以上版本。
  • 在某些情况下可能会增加生产者的延迟。

2.3 事务性生产者/消费者

Kafka支持事务性消息,允许生产者和消费者在一个事务中一起工作。生产者可以将一组消息作为一个事务写入Kafka,消费者也可以在一个事务中读取和处理消息。这样可确保消息处理的原子性和一致性。要使用事务性生产者,需要配置transactional.id
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

优点:

  • 提供强一致性保证。
  • 避免消息处理中的部分提交问题。

缺点:

  • 复杂度较高,需Kafka 0.11.0及以上版本。
  • 性能开销较大,适用于对一致性要求高的场景。

2.4 手动提交偏移量

Kafka消费者默认会自动提交偏移量(auto commit),为更好地控制消息处理和偏移量提交,可关闭自动提交(enable.auto.commit=false),并在确保消息处理成功后手动提交偏移量。这可通过commitSync()commitAsync()方法来实现。
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息}consumer.commitSync();
}

优点:

  • 精细控制偏移量提交时机,确保消息处理成功后才提交。
  • 提高处理的可靠性。

缺点:

  • 增加消费者代码的复杂性。
  • 如果处理逻辑很慢,可能导致偏移量提交延迟。

2.5 外部存储管理偏移量

在某些特定场景下,可将偏移量存储在外部存储(如数据库)中,而不是依赖 Kafka的内部偏移量管理。这样可在消息处理和偏移量提交之间建立更强的关联,确保只有当消息处理成功后才更新偏移量。
优点:

  • 可以在消息处理和偏移量提交之间建立更强的关联。
  • 灵活性高,可根据业务需求自定义偏移量管理。

缺点:

  • 需要额外的存储和管理逻辑。
  • 增加系统的复杂性。

2.6 去重逻辑

在消息处理逻辑中引入去重机制。
例如,可以使用消息的唯一标识符(如消息ID)在处理前检查是否已经处理过该消息,从而避免重复处理。
优点:

  • 灵活性高,可根据业务逻辑自定义去重策略。
  • 适用于需要严格去重的场景。

缺点:

  • 需要额外的存储和管理去重信息。
  • 增加处理逻辑的复杂性。

2.7 幂等消息处理逻辑

设计消息处理逻辑时,尽量使其成为幂等操作,即相同的消息即使被处理多次也不会产生副作用。
例如,在数据库操作时,可以使用UPSERT操作(更新插入)来确保数据的一致性。
优点:

  • 简化重复消费问题的处理。适
  • 用于可以设计为幂等操作的业务场景。

缺点:

  • 并不是所有业务逻辑都能设计为幂等操作。
  • 需要仔细设计和验证处理逻辑的幂等性。

2.8 小结

对于大多数场景,结合使用消费者组、手动提交偏移量和幂等处理逻辑可以有效避免重复消费,而在需要更严格一致性的场景下,可以考虑使用幂等生产者和事务性消息
具体选择方案取决于具体的应用场景和需求。

三、Kafka持久化策略

Kafka实际上就是日志消息存储系统, 根据offset获取对应的消息,消费者获取到消息之后该消息不会立即从mq中移除,而是继续存储在磁盘中

3.1 持久化文件

topic有分区(partition)的概念,Kafka 会将topic分成多个不同的分区,生产者往同一个topic发送的消息最终是发送到不同的分区里面,每个分区中拆分成多个不同的segment文件存储日志。
每个segment文件包含:

  • .index 文件 (消息偏移量索引文件)
  • .log 文件(消息物理存放文件)
  • .timeindex文件(时间索引文件)

每个segment文件容量最大默认为500MB,如果超过500MB就生成新的 segment文件,且文件命名后几位表示上个segment文件最后offset值,如:segment01 、segment500 、segment1000
由此可知:一个topic里的消息是由该topic下所有分区里的消息组成的。在同一个分区内部,消息是有序的,而不同分区之间,消息是不能保证有序的

存储的消息日志文件在 server.properties 配置文件的 log.dirs 参数指定的目录下,以" t o p i c − topic- topicpartition"为名称的目录:
在这里插入图片描述
注:由于每个分区都有leader的概念,而不同分区的leader可能位于不同的broker上,除leader外,分区还有副本(replica)的概念,因此每个broker只会存储分区leader或副本位于该broker中的topic的消息。

3.2 segment 分段策略

在 server.properties 配置文件中,分段文件配置默认是500MB ,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件),文件较多时性能会稍微降低。下面是相关配置参数:

##日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment
log.roll.hours=72
##segment的索引文件最大尺寸限制,即时log.segment.bytes没达到,也会生成一个新的segment
log.index.size.max.bytes=10*1024*1024
##控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1024*1024*1024

3.3 数据文件刷盘策略

当把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。若此时操作系统宕机,数据就会丢失。
这里可根据消息的数量log.flush.interval.messages和时间log.flush.interval.ms进行配置,如果时间设置的过大,有没达到指定的数量的情况下,如果系统宕机,数据就会丢失。
Kafka官方并不建议通过Broker端的log.flush.interval.messageslog.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。

##每当producer写入10000条消息时,刷数据到磁盘 配置
log.flush.interval.messages=10000
##每间隔5秒钟时间,刷数据到磁盘
log.flush.interval.ms=5000

3.4 日志清理策略

##   是否开启日志清理
log.cleaner.enable=true
##  日志清理运行的线程数
log.cleaner.threads = 2
##  日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖,默认 delete
log.cleanup.policy = delete
##  数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
##  log.retention.bytes和 log.retention.minutes或 log.retention.hours任意一个达到要求,都会执行删除
log.retention.minutes=300
log.retention.hours=24
##   topic每个分区的最大文件大小,-1没有大小限制
log.retention.bytes=-1
##  文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes

3.5 Kafka消息查找策略

前文提到每个segment file有命名规则,且在.index文件中,存储的是key-value格式的,key代表在.log中按顺序开始顺序消费的offset值,value代表该消息的物理消息存放位置。
但是在.index中不是对每条消息都做记录,它是每隔一些消息记录一次,避免占用太多内存。即使消息不在index记录中,在已有的记录中查找,范围也大大缩小。
kafka就是利用分段+索引的方式来解决查找效率的问题,kafka没有对每个文件建立索引,而是利用kafka 消息写入磁盘的顺序性,对其中部分的消息建立偏移量索引和时间戳索引,这就是稀疏索引,目的是节约空间的资源,定位到相邻.log文件,再根据顺序遍历查找,此方式的时间复杂度是O(n)。
其中,偏移量索引源码:

offsetIndex.append(largestOffset, physicalPosition)def append(offset: Long, position: Int) {inLock(lock) {// 索引位置mmap.putInt(relativeOffset(offset))// 日志位置mmap.putInt(position)_entries += 1_lastOffset = offset}
}// 用当前offset减去基准offset
def relativeOffset(offset: Long): Int = {val relativeOffset = offset - baseOffset
}

时间戳索引源码:

timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {inLock(lock) {if (timestamp > lastEntry.timestamp) {// 添加时间戳mmap.putLong(timestamp)// 添加相对位移(偏移量索引)mmap.putInt(relativeOffset(offset))_entries += 1_lastEntry = TimestampOffset(timestamp, offset)}}
}

Kafka使用改进版的二分查找,改的不是二分查找的内部,而是把所有索引项分为热区和冷区 这个改进可以让查询热数据部分时,遍历的Page永远是固定的,这样能避免缺页中断。
整体流程:
在这里插入图片描述

四、Kafka零复制(Zero-copy)

Kafka信息复制的原因:确保任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
Kafka之所以能够快速地处理大量数据,其中一个重要原因就是采用零拷贝(Zero-copy)技术。Kafka采用两种零拷贝技术来提高性能:mmap(memory-map)sendfile
主要有两个大的场景:

  • Broker 读写.index文件,用 mmap零复制
  • Broker 向Consumer发消息,用 sendfile 零复制

mmap (memory-map):把文件映射到进程的虚拟内存空间。通过对这段内存的读取和修改,可以实现对文件的读取和修改,而不需要用read和write系统调用。
sendfile:直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。

五、Kafka设计实现延迟消息

Kafka延时操作的实现方式:基于时间戳的延时和基于特殊Topic的延时
(1)基于时间戳的延时:通过设置消息的时间戳来实现延时操作。Producer在发送消息时,可以为消息设置一个未来的时间戳,指定消息在该时间点之后才能被消费者消费。Kafka会根据消息的时间戳进行延时推送,直到时间点到达后才将消息发送给消费者。
(2)基于特殊Topic的延时:通过创建专门的延时Topic来实现延时操作。可以将需要延时的消息发送到延时Topic中,然后设置一个定时任务来定期检查延时Topic中的消息,并将到期的消息转发到目标Topic供消费者消费。

简单步骤:

1.创建正常的topic(即即时消费的消息)。
2.创建延迟的topic,并设置合适的副本因子和参数以支持延迟消费。
3.发送消息到正常的topic,同时指定消息需要被延迟消费。
4.使用Kafka的消费者API从延迟topic拉取消息并处理。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class DelayedMessageProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);// 正常的topicString immediateTopic = "immediate-messages";// 延迟的topicString delayedTopic = "delayed-messages";// 消息内容String value = "This is a delayed message";// 延迟消费的时间,例如10秒long delayTime = 10000;// 发送消息到延迟的topicproducer.send(new ProducerRecord<>(delayedTopic, 0, System.currentTimeMillis() + delayTime, value));producer.close();}
}

六、Kafka与ZooKeeper依赖性

从Kafka 2.8版本开始,Kafka提供KRaft模式,需要配置Quorm控制器,可以在没有ZooKeeper的情况下运行Kafka集群。
之前版本,Zookeeper是Kafka的核心组件之一,负责集群元数据的管理和控制器的选举等任务。Zookeeper存储和管理着Kafka的元数据信息和配置信息,包括broker的IP地址、端口号、主题分区的分配方案等。此外,Zookeeper还帮助Kafka集群实现自动故障转移和负载均衡等功能。


http://www.ppmy.cn/news/1531494.html

相关文章

ip是可以从能够上网的设备提取吗

是的&#xff0c;IP地址可以从能够上网的设备提取。以下是如何从不同设备提取IP地址的具体方法&#xff1a; 在电脑上提取IP地址 Windows: 打开命令提示符&#xff08;按下 Win R&#xff0c;输入 cmd&#xff0c;按回车&#xff09;。 输入命令 ipconfig&#xff0c;按回车。…

【分布式微服务云原生】Dockerfile命令详解

Dockerfile 是一个文本文件&#xff0c;它包含了一系列的指令&#xff0c;用于构建一个 Docker 镜像。下面是 Dockerfile 中常用命令的详细解释和示例。 1. 获取镜像的三种方式 通过命令拉取镜像 docker pull 镜像名称从镜像仓库拉取指定的镜像。 通过容器创建镜像 docker…

Mybatis(进阶部分)

四 Mybatis完成CURD&#xff08;二&#xff09; 4.5 多条件CRUD 之前的案例中&#xff0c;接口里方法的形参个数都是1个&#xff1b;如果方法形参是两个或者两个以上时&#xff0c;MyBatis又该如何获取获取参数呢&#xff1f; Mybatis提供了好几种方式&#xff0c;可以获取多…

Lenovo SR850服务器亮黄灯维修和升级CPU扩展模块

佛山市三水区某高校1台Lenovo Thinksystem SR850服务器黄灯故障到现场检修 和 升级3号和4号CPU。加强服务器的计算性能&#xff1b; 故障情况是该学校it管理员这一天看到这台SR850服务器前面板亮了一个黄灯&#xff0c;但是目前系统运行正常&#xff0c;出于安全考虑&#xff0…

Redis 五大基本数据类型及其应用场景进阶(缓存预热、雪崩 、穿透 、击穿)

Redis 数据类型及其应用场景 Redis 是什么? Redis是一个使用C语言编写的高性能的基于内存的非关系型数据库&#xff0c;基于Key/Value结构存储数据&#xff0c;通常用来 缓解高并发场景下对某一资源的频繁请求 &#xff0c;减轻数据库的压力。它支持多种数据类型,如字符串、…

T8:猫狗识别

T8周&#xff1a;猫狗识别 **一、前期工作**1.设置GPU,导入库2.导入数据3.查看数据 **二、数据预处理**1.加载数据2.可视化数据3.配置数据集 **三、构建CNN网络模型****四、编译模型****五、训练模型****六、模型评估****七、预测**八、总结1、[train_on_batch 和 test_on_batc…

QCommandLineParser简介

QCommandLineParser QCommandLineParser 是 Qt 提供的一个类&#xff0c;用于解析命令行参数。它使得处理命令行参数变得简单和高效&#xff0c;适用于需要从命令行获取输入的控制台应用程序或需要支持命令行选项的 GUI 应用程序。 主要功能和用途 定义命令行选项&#xff1…

docker进入容器命令

文章目录 什么是 Docker 容器&#xff1f;为什么要进入 Docker 容器&#xff1f;如何进入 Docker 容器&#xff1f;步骤 1&#xff1a;查看正在运行的容器步骤 2&#xff1a;进入容器步骤 3&#xff1a;在容器内工作 小贴士 什么是 Docker 容器&#xff1f; 首先&#xff0c;让…