kafka_0">kafka常见的消费场景
- 从头开始消费
- 从最新偏移量消费
- 从特定时间戳
- 偏移量消费
kafka_7">kafka消费场景详细配置方法
消费模式 | 配置参数 | 场景 |
---|---|---|
从头开始消费 | scan.startup.mode=earliest-offset | 回放所有历史数据 |
从最新偏移量消费 | scan.startup.mode=latest-offset | 实时消费新数据 |
从特定时间戳消费 | scan.startup.mode=timestamp + 时间戳 | 按时间点重放历史数据 (从某一业务时间点开始消费 |
从指定偏移量消费 | scan.startup.mode=specific-offsets | 从特定数据点进行消费或故障恢复(手动设置)。 |
从提交的偏移量消费 | scan.startup.mode = group-offsets(默认) | 断点续传数据(继续上一次的消费进度,ck自动重试时) |
0.生产数据,用于测试
cd apps/kafka/bin
#1. 创建Kafka主题 test_topic,指定分区数量为1,副本数量为1
./kafka-topics.sh \
--create \
--topic test_topic\
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# 2.向 test_k1 中写入JSON格式的样例数据
./kafka-console-producer.sh \
--topic test_topic \
--bootstrap-server localhost:9092插入的数据:
{"id": "1", "event_time": "2024-11-19T10:00:00", "action": "view"}
{"id": "2", "event_time": "2024-11-19T10:01:00", "action": "click"}
{"id": "3", "event_time": "2024-11-19T10:02:00", "action": "view"}
{"id": "4", "event_time": "2024-11-19T10:03:00", "action": "click"}# 3.创建一个消费者组 group_k1 来消费 test_k1 数据验证
./kafka-console-consumer.sh \
--topic test_topic \
--bootstrap-server localhost:9092 \
--group group_k1 \
--from-beginning
1.从头开始消费
从 Kafka 的最早偏移量(earliest-offset)开始消费:
CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
CREATE TABLE print_sink (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'print'
);INSERT INTO print_sink
SELECT id, event_time, action
FROM kafka_source输出结果:
+I[1, 2024-11-19T10:00:00, view]
+I[2, 2024-11-19T10:01:00, click]
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]
1.从最新偏移量消费
只消费从启动后产生的新数据(latest-offset)。
CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'latest-offset','format' = 'json'
);
启动Kafka
./kafka-console-producer.sh \
--topic test_topic \
--bootstrap-server localhost:9092
输入新数据(例如):
{"id": "5", "event_time": "2024-11-19T10:04:00", "action": "click"}结果输出:
+I[5, 2024-11-19T10:04:00, click]
3.从特定时间戳消费
消费从指定时间戳之后的 Kafka 消息
CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'timestamp','scan.startup.timestamp-millis' = '1700000100000', -- 具体时间戳'format' = 'json'
);假设 1700000100000 对应的时间为 "2024-11-19T10:01:40"。输出结果是:从符合时间条件的偏移量开始消费的
+I[2, 2024-11-19T10:01:00, click]
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]
4.从指定偏移量消费
从 Kafka 的特定分区和偏移量开始消费。
CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'specific-offsets','scan.startup.specific-offsets' = 'partition:0,offset:2', -- 指定分区和偏移量'format' = 'json'
);输出结果
从 partition:0 偏移量为 2 的消息开始消费:
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]
5.从提交的偏移量消费
默认消费模式,从 Kafka 提交的偏移量继续消费。
CREATE TABLE kafka_source (id STRING,event_time TIMESTAMP(3),action STRING
) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'test_group','scan.startup.mode' = 'group-offsets','format' = 'json'
);输出结果
假设消费者上一次消费到的偏移量为 2,则会从偏移量 2 开始继续消费:
+I[3, 2024-11-19T10:02:00, view]
+I[4, 2024-11-19T10:03:00, click]
选择消费场景的考虑因素
- 数据完整性:需要处理历史数据时,选择 earliest-offset 或特定时间戳。
- 实时性:实时处理当前流式数据时,选择 latest-offset。
- 容错性:希望程序重启后从断点继续消费时,选择 group-offsets。
- 灵活性:复杂场景下选择 specific-offsets 或时间戳消费。
示例对比
以 Kafka 主题 test_topic 为例,不同消费模式的运行效果:(1)从头开始消费
Kafka 消费到所有数据:Message 1、2、3...(包括历史消息)。
(2)从最新偏移量消费
Kafka 消费从当前最新的偏移量开始:Message 4、5、6...(历史消息跳过)。
(3)从特定时间戳消费
Kafka 消费从时间戳对应的偏移量开始:Message 2、3、4...。
(4)从指定偏移量消费
Kafka 消费从手动设置的偏移量:从偏移量 42 开始,例如 Message 42、43、44...。