kafka不同的消费场景

news/2024/11/20 19:55:15/

kafka_0">kafka常见的消费场景

  • 从头开始消费
  • 从最新偏移量消费
  • 从特定时间戳
  • 偏移量消费

kafka_7">kafka消费场景详细配置方法

消费模式配置参数场景
从头开始消费scan.startup.mode=earliest-offset回放所有历史数据
从最新偏移量消费scan.startup.mode=latest-offset实时消费新数据
从特定时间戳消费scan.startup.mode=timestamp + 时间戳按时间点重放历史数据 (从某一业务时间点开始消费
从指定偏移量消费scan.startup.mode=specific-offsets从特定数据点进行消费或故障恢复(手动设置)。
从提交的偏移量消费scan.startup.mode = group-offsets(默认)断点续传数据(继续上一次的消费进度,ck自动重试时)

0.生产数据,用于测试

cd apps/kafka/bin

#1. 创建Kafka主题 test_topic,指定分区数量为1,副本数量为1
./kafka-topics.sh \
--create \
--topic test_topic\
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# 2.向 test_k1 中写入JSON格式的样例数据
./kafka-console-producer.sh \
--topic test_topic \
--bootstrap-server localhost:9092插入的数据:
{"id": "1", "event_time": "2024-11-19T10:00:00", "action": "view"}
{"id": "2", "event_time": "2024-11-19T10:01:00", "action": "click"}
{"id": "3", "event_time": "2024-11-19T10:02:00", "action": "view"}
{"id": "4", "event_time": "2024-11-19T10:03:00", "action": "click"}# 3.创建一个消费者组 group_k1 来消费 test_k1 数据验证
./kafka-console-consumer.sh \
--topic test_topic \
--bootstrap-server localhost:9092 \
--group group_k1 \
--from-beginning

1.从头开始消费

从 Kafka 的最早偏移量(earliest-offset)开始消费:

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
CREATE TABLE print_sink (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'print'
);INSERT INTO print_sink
SELECT id, event_time, action
FROM kafka_source输出结果:
+I[1, 2024-11-19T10:00:00, view]
+I[2, 2024-11-19T10:01:00, click]
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

1.从最新偏移量消费

只消费从启动后产生的新数据(latest-offset)。

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'latest-offset','format' = 'json'
);
启动Kafka
./kafka-console-producer.sh \
--topic test_topic \
--bootstrap-server localhost:9092
输入新数据(例如):
{"id": "5", "event_time": "2024-11-19T10:04:00", "action": "click"}结果输出:
+I[5, 2024-11-19T10:04:00, click]

3.从特定时间戳消费

消费从指定时间戳之后的 Kafka 消息

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '1700000100000', -- 具体时间戳'format' = 'json'
);假设 1700000100000 对应的时间为 "2024-11-19T10:01:40"。输出结果是:从符合时间条件的偏移量开始消费的
+I[2, 2024-11-19T10:01:00, click]
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

4.从指定偏移量消费

从 Kafka 的特定分区和偏移量开始消费。

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'specific-offsets','scan.startup.specific-offsets' = 'partition:0,offset:2', -- 指定分区和偏移量'format' = 'json'
);输出结果
从 partition:0 偏移量为 2 的消息开始消费:
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

5.从提交的偏移量消费

默认消费模式,从 Kafka 提交的偏移量继续消费。

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'group-offsets','format' = 'json'
);输出结果
假设消费者上一次消费到的偏移量为 2,则会从偏移量 2 开始继续消费:
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

选择消费场景的考虑因素

  1. 数据完整性:需要处理历史数据时,选择 earliest-offset 或特定时间戳。
  2. 实时性:实时处理当前流式数据时,选择 latest-offset。
  3. 容错性:希望程序重启后从断点继续消费时,选择 group-offsets。
  4. 灵活性:复杂场景下选择 specific-offsets 或时间戳消费。
示例对比
以 Kafka 主题 test_topic 为例,不同消费模式的运行效果:(1)从头开始消费
Kafka 消费到所有数据:Message 123...(包括历史消息)。
(2)从最新偏移量消费
Kafka 消费从当前最新的偏移量开始:Message 456...(历史消息跳过)。
(3)从特定时间戳消费
Kafka 消费从时间戳对应的偏移量开始:Message 234...(4)从指定偏移量消费
Kafka 消费从手动设置的偏移量:从偏移量 42 开始,例如 Message 424344...

http://www.ppmy.cn/news/1548555.html

相关文章

LLM学习笔记(5)微调 Fine-tuning

什么是微调(Fine-tuning)? 微调(Fine-tuning)是指在预训练模型(如 GPT)基础上,通过加入特定的数据对模型进行进一步训练,以优化其在某一特定任务或领域上的表现。它的主…

0x00基础算法 -- 0x06 倍增

资料来源:算法竞赛进阶指南活动 - AcWing 1、倍增 倍增:"成倍增长",指进行递推时,如果状态空间很大,通常的线性递推无法满足时间和空间复杂度的要求,就可以通过成倍增长的方式,只递推…

如何使用虚拟机 打开另一个虚拟机的 硬盘

问题: 我在鼓捣一个虚拟机的 nvim 的时候,突然搞成 segment fault , 系统无法启动,但是,我编译的源码还在 这个虚拟机中。 解决过程: 直接使用 另一个 虚拟机, 添加这个虚拟机的硬盘。 我之前的虚拟机是这…

Python设计模式详解之5 —— 原型模式

Prototype 设计模式是一种创建型设计模式,它通过复制已有的实例来创建新对象,而不是通过从头实例化。这种模式非常适合对象的创建成本较高或者需要避免复杂的构造过程时使用。Prototype 模式提供了一种通过克隆来快速创建对象的方式。 1. Prototype 模式…

Mac 修改默认jdk版本

当前会话生效 这里演示将 Java 17 版本降低到 Java 8 查看已安装的 Java 版本: 在终端(Terminal)中运行以下命令,查看已安装的 Java 版本列表 /usr/libexec/java_home -V设置默认 Java 版本: 找到 Java 8 的安装路…

鸿蒙NEXT开发案例:计数器

【引言】(完整代码在最后面) 本文将通过一个简单的计数器应用案例,介绍如何利用鸿蒙NEXT的特性开发高效、美观的应用程序。我们将涵盖计数器的基本功能实现、用户界面设计、数据持久化及动画效果的添加。 【环境准备】 电脑系统&#xff1…

STM32单片机CAN总线汽车线路通断检测-分享

目录 目录 前言 一、本设计主要实现哪些很“开门”功能? 二、电路设计原理图 1.电路图采用Altium Designer进行设计: 2.实物展示图片 三、程序源代码设计 四、获取资料内容 前言 随着汽车电子技术的不断发展,车辆通信接口在汽车电子控…

openeuler设置IP

1 编辑网络配置文件:通常在/etc/sysconfig/network-scripts/目录下,对应的网络接口配置文件名为ifcfg-,例如ifcfg-eth0。 2. 修改配置文件:将BOOTPROTO的值改为static,并设置IPADDR、NETMASK、GATEWAY和DNS等参数。…