kafka不同的消费场景

embedded/2024/11/20 15:09:06/

kafka_0">kafka常见的消费场景

  • 从头开始消费
  • 从最新偏移量消费
  • 从特定时间戳
  • 偏移量消费

kafka_7">kafka消费场景详细配置方法

消费模式配置参数场景
从头开始消费scan.startup.mode=earliest-offset回放所有历史数据
从最新偏移量消费scan.startup.mode=latest-offset实时消费新数据
从特定时间戳消费scan.startup.mode=timestamp + 时间戳按时间点重放历史数据 (从某一业务时间点开始消费
从指定偏移量消费scan.startup.mode=specific-offsets从特定数据点进行消费或故障恢复(手动设置)。
从提交的偏移量消费scan.startup.mode = group-offsets(默认)断点续传数据(继续上一次的消费进度,ck自动重试时)

0.生产数据,用于测试

cd apps/kafka/bin

#1. 创建Kafka主题 test_topic,指定分区数量为1,副本数量为1
./kafka-topics.sh \
--create \
--topic test_topic\
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# 2.向 test_k1 中写入JSON格式的样例数据
./kafka-console-producer.sh \
--topic test_topic \
--bootstrap-server localhost:9092插入的数据:
{"id": "1", "event_time": "2024-11-19T10:00:00", "action": "view"}
{"id": "2", "event_time": "2024-11-19T10:01:00", "action": "click"}
{"id": "3", "event_time": "2024-11-19T10:02:00", "action": "view"}
{"id": "4", "event_time": "2024-11-19T10:03:00", "action": "click"}# 3.创建一个消费者组 group_k1 来消费 test_k1 数据验证
./kafka-console-consumer.sh \
--topic test_topic \
--bootstrap-server localhost:9092 \
--group group_k1 \
--from-beginning

1.从头开始消费

从 Kafka 的最早偏移量(earliest-offset)开始消费:

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
CREATE TABLE print_sink (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'print'
);INSERT INTO print_sink
SELECT id, event_time, action
FROM kafka_source输出结果:
+I[1, 2024-11-19T10:00:00, view]
+I[2, 2024-11-19T10:01:00, click]
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

1.从最新偏移量消费

只消费从启动后产生的新数据(latest-offset)。

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'latest-offset','format' = 'json'
);
启动Kafka
./kafka-console-producer.sh \
--topic test_topic \
--bootstrap-server localhost:9092
输入新数据(例如):
{"id": "5", "event_time": "2024-11-19T10:04:00", "action": "click"}结果输出:
+I[5, 2024-11-19T10:04:00, click]

3.从特定时间戳消费

消费从指定时间戳之后的 Kafka 消息

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '1700000100000', -- 具体时间戳'format' = 'json'
);假设 1700000100000 对应的时间为 "2024-11-19T10:01:40"。输出结果是:从符合时间条件的偏移量开始消费的
+I[2, 2024-11-19T10:01:00, click]
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

4.从指定偏移量消费

从 Kafka 的特定分区和偏移量开始消费。

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'specific-offsets','scan.startup.specific-offsets' = 'partition:0,offset:2', -- 指定分区和偏移量'format' = 'json'
);输出结果
从 partition:0 偏移量为 2 的消息开始消费:
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

5.从提交的偏移量消费

默认消费模式,从 Kafka 提交的偏移量继续消费。

CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'group-offsets','format' = 'json'
);输出结果
假设消费者上一次消费到的偏移量为 2,则会从偏移量 2 开始继续消费:
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]

选择消费场景的考虑因素

  1. 数据完整性:需要处理历史数据时,选择 earliest-offset 或特定时间戳。
  2. 实时性:实时处理当前流式数据时,选择 latest-offset。
  3. 容错性:希望程序重启后从断点继续消费时,选择 group-offsets。
  4. 灵活性:复杂场景下选择 specific-offsets 或时间戳消费。
示例对比
以 Kafka 主题 test_topic 为例,不同消费模式的运行效果:(1)从头开始消费
Kafka 消费到所有数据:Message 123...(包括历史消息)。
(2)从最新偏移量消费
Kafka 消费从当前最新的偏移量开始:Message 456...(历史消息跳过)。
(3)从特定时间戳消费
Kafka 消费从时间戳对应的偏移量开始:Message 234...(4)从指定偏移量消费
Kafka 消费从手动设置的偏移量:从偏移量 42 开始,例如 Message 424344...

http://www.ppmy.cn/embedded/139103.html

相关文章

微信小程序-prettier 格式化

一.安装prettier插件 二.配置开发者工具的设置 配置如下代码在setting.json里: "editor.formatOnSave": true,"editor.defaultFormatter": "esbenp.prettier-vscode","prettier.documentSelectors": ["**/*.wxml"…

PySpark3:Row对象常见操作以及Row、RDD、DataFrame互相转换

目录 一、Row对象常见操作 二、Row、RDD、DataFrame互相转换 1、RDD—>DataFrame 2、DataFrame—>RDD 3、DataFrame—>Row 4、Row—>DataFrame 一、Row对象常见操作 from pyspark.sql import Row# 创建一个Row对象 row Row(name"张三", age25)# …

使用dataGrip连接spark

概述: spark的配置共有5种 1、本地模式 2、集群模式:standalone, yarn,k8s,mesos四种集群模式 spark本身只是一个计算引擎,是没有数据库的,所以说数据需要在hdfs上存放,而数据库就…

高质量发展统计监测

我国经济迈向高质量发展,在统计标准和统计制度上采取了哪些措施及时监测反映经济转型发展状况? 随着我国经济从高速增长阶段转向高质量发展阶段,经济转型发展也对统计工作提出了新的要求。为服务我国高质量发展要求,为经济社会转…

Java基于微信小程序+SSM的校园失物招领小程序

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

【单片机基础】通信接口(UART, SPI, I2C等)的工作方式

单片机(Microcontroller Unit, MCU)中的通信接口用于与外部设备进行数据交换。常见的通信接口包括UART(Universal Asynchronous Receiver/Transmitter)、SPI(Serial Peripheral Interface)和I2C&#xff08…

solana链上智能合约开发案例一则

环境搭建 安装Solana CLI:Solana CLI是开发Solana应用的基础工具。你可以通过官方文档提供的安装步骤,在本地环境中安装适合你操作系统的Solana CLI版本。安装完成后,使用命令行工具进行配置,例如设置网络环境(如开发网…

【ArcGISPro】使用AI模型提取要素-提取车辆(目标识别)

示例数据下载 栅格数据从网上随便找一个带有车辆的栅格数据 f094a6b1e205cd4d30a2e0f816f0c6af.jpg (1200799) (588ku.com) 添加数据