kafka入门(二): 位移提交

news/2024/11/7 12:42:32/

位移提交:

Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。

消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,

因此,需要记录上一次消费时的消费位移,并且持久化。

消费者在消费完消息之后,需要执行消费位移的提交。

自动位移提交:

Kafka默认的消费位移的提交方式是 自动提交。

自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。

默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。

自动位移提交,有可能会重复消费和消息丢失。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。

手动位移提交:

手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。

手动位移提交,可以分为 同步提交、异步提交。

commitSync() 同步提交

同步提交,会阻塞消费者线程直到位移提交完成。

示例代码:

public class OffsetCommitSync {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println("手动提交位移成功.");}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//不自动提交,采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

commitAsync() 异步提交 :

异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。

异步提交,将 consumer.commitSync(); 换成 commitAsync。

如果还需要回调,就用 OffsetCommitCallback对象作为参数。

示例如下:

public class OffsetCommitAsyncCallback {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//异步回调,如果不需要回调,就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

参考资料:

《深入理解kafka:核心设计与实践原理》


http://www.ppmy.cn/news/1241528.html

相关文章

Android设计模式--外观模式

弈之为术&#xff0c;在人自悟 一&#xff0c;定义 外观模式要求一个子系统的外部与其内部的通信必须通过一个统一的对象进行。提供一个高层次的接口&#xff0c;使得子系统更易于使用。 外观模式在开发中的使用频率是非常高的&#xff0c;尤其是在第三方的SDK里面&#xff0…

TCP 重传、滑动窗口、流量控制、拥塞控制的剖析

TCP 是一个可靠传输的协议&#xff0c;那它是如何保证可靠的呢&#xff1f; 为了实现可靠性传输&#xff0c;需要考虑很多事情&#xff0c;例如数据的破坏、丢包、重复以及分片顺序混乱等问题。如不能解决这些问题&#xff0c;也就无从谈起可靠传输。 那么&#xff0c;TCP 是…

2. 寄存器

锁存器,用于存储1位的电路 只有当 可写位(write enable)开启,才会把输入写到输出,同时保存输出 使用锁存器 带时钟的锁存器 带时钟带可写控制的完整版锁存器 下面的时钟使用按钮来代替, 只有按钮为1时,相连的电路才工作时钟的作用在于协同所有电路共同工作,也是一切电路自动化…

MySQL- 创建可以远程访问的root账户

创建用户 默认的root用户只能当前节点localhost访问&#xff0c;是无法远程访问的&#xff0c;所以&#xff0c;我们要创建一个root账户&#xff0c;帮助用户远程访问。 create user root% IDENTIFIED WITH mysql_native_password BY 1234;这个命令是在MySQL数据库管理系统中…

每日OJ题_算法_双指针_力扣11. 盛最多水的容器

力扣11. 盛最多水的容器 11. 盛最多水的容器 - 力扣&#xff08;LeetCode&#xff09; 难度 中等 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成…

owl文件导入neo4j教程(win11)

一、前置条件 下载安装好neo4j 我是在官网下载的社区版4.4.28 下载neosemantics-4.4.0.3.jar 直接贴下载网址 https://objects.githubusercontent.com 下载rdf2rdf https://sourceforge.net/projects/rdf2rdf/ 下载并安装配置好jdk8 下载jdk11 二、步骤 1.使用如下命令将owl转…

基于STC12C5A60S2系列1T 8051单片读写掉电保存数据IIC总线器件24C02多字节并显示在液晶显示器LCD1602上应用

基于STC12C5A60S2系列1T 8051单片多字节读写掉电保存数据IIC总线器件24C02多字节并显示在液晶显示器LCD1602上应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍IIC通…

论文笔记:详解NEUPSL DSI

《Using Domain Knowledge to Guide Dialog Structure Induction via Neural Probabilistic 》 名词解释 Dialog Structure Induction&#xff08;DSI&#xff09;是推断给定目标导向对话的潜在对话结构&#xff08;即一组对话状态及其时间转换&#xff09;的任务。它是现代对…