doris:Kafka 导入数据

news/2025/1/21 15:20:58/

Doris 提供以下方式从 Kafka 导入数据:

  • 使用 Routine Load 消费 Kafka 数据

Doris 通过 Routine Load 持续消费 Kafka Topic 中的数据。提交 Routine Load 作业后,Doris 会实时生成导入任务,消费 Kafka 集群中指定 Topic 的消息。Routine Load 支持 CSV 和 JSON 格式,具备 Exactly-Once 语义,确保数据不丢失且不重复。更多文档请参考 Routine Load。

  • Doris Kafka Connector 消费 Kafka 数据

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSON、Avro、Protobuf),并支持解析 Debezium 组件的数据格式。更多文档请参考 Doris Kafka Connector。

使用 Routine Load 消费 Kafka 数据​

使用限制​

  1. 支持的消息格式为 CSV 和 JSON。CSV 每个消息为一行,且行尾不包含换行符;
  2. 默认支持 Kafka 0.10.0.0 及以上版本。若需使用旧版本(如 0.9.0,0.8.2,0.8.1,0.8.0),需修改 BE 配置,将 kafka_broker_version_fallback 设置为兼容的旧版本,或在创建 Routine Load 时设置 property.broker.version.fallback。使用旧版本可能导致部分新特性无法使用,如根据时间设置 Kafka 分区的 offset。

操作示例​

在 Doris 中通过 CREATE ROUTINE LOAD 命令创建常驻 Routine Load 导入任务,分为单表导入和多表导入。详细语法请参考 CREATE ROUTINE LOAD。

单表导入​

第 1 步:准备数据

在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-csv --from-beginning
1,Emily,25

第 2 步:在库中创建表

在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE testdb.test_routineload_tbl(user_id            BIGINT       NOT NULL COMMENT "user id",name               VARCHAR(20)           COMMENT "name",age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至单表

在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-routine-load-csv","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

第 4 步:检查导入数据

mysql> select * from test_routineload_tbl;
+-----------+----------------+------+
| user_id   | name           | age  |
+-----------+----------------+------+
|  1        | Emily          | 25   |
+-----------+----------------+------+
1 rows in set (0.01 sec)

多表导入​

对于需要同时导入多张表的场景,Kafka 中的数据需包含表名信息。支持从 Kafka 的 Value 中获取动态表名,格式为:table_name|{"col1": "val1", "col2": "val2"}。CSV 格式类似:table_name|val1,val2,val3。注意,表名必须与 Doris 中的表名一致,否则导入失败,且动态表不支持后面介绍的 column_mapping 配置。

第 1 步:准备数据

在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-multi-table-load --from-beginning
test_multi_table_load1|1,Emily,25
test_multi_table_load2|2,Benjamin,35

第 2 步:在库中创建表

在 Doris 中创建被导入的表,具体语法如下:

表 1:

CREATE TABLE test_multi_table_load1(user_id            BIGINT       NOT NULL COMMENT "用户 ID",name               VARCHAR(20)           COMMENT "用户姓名",age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

表 2:

CREATE TABLE test_multi_table_load2(user_id            BIGINT       NOT NULL COMMENT "用户 ID",name               VARCHAR(20)           COMMENT "用户姓名",age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至多表

在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

CREATE ROUTINE LOAD example_multi_table_load
COLUMNS TERMINATED BY ","
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-multi-table-load","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

第 4 步:检查导入数据

mysql> select * from test_multi_table_load1;
+------+----------------+------+
| id   | name           | age  |
+------+----------------+------+
|  1   | Emily          | 25   |
+------+----------------+------+
1 rows in set (0.01 sec)mysql> select * from test_multi_table_load2;
+------+----------------+------+
| id   | name           | age  |
+------+----------------+------+
|  2   | Benjamin       | 35   |
+------+----------------+------+
1 rows in set (0.01 sec)

配置安全认证

有关带有认证的 Kafka 配置方法,请参见 Kafka 安全认证。

doris-kafka-connector-消费-kafka-数据">使用 Doris Kafka Connector 消费 Kafka 数据​

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSON、Avro、Protobuf),并支持解析 Debezium 组件的数据格式。

以 Distributed 模式启动​

Distributed 模式为 Kafka Connect 提供可扩展性和自动容错功能。在此模式下,可以使用相同的 group.id 启动多个工作进程,它们会协调在所有可用工作进程中安排连接器和任务的执行。

  1. 在 $KAFKA_HOME 下创建 plugins 目录,将下载好的 doris-kafka-connector jar 包放入其中。
  2. 配置 config/connect-distributed.properties
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092# 修改 group.id,同一集群的需要一致
group.id=connect-cluster# 修改为创建的 plugins 目录
# 注意:此处请填写 Kafka 的直接路径。例如:plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins# 建议将 Kafka 的 max.poll.interval.ms 时间调大到 30 分钟以上,默认 5 分钟
# 避免 Stream Load 导入数据消费超时,消费者被踢出消费群组
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

  1. 启动:
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

  1. 消费 Kafka 数据:
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name":"test-doris-sink-cluster","config":{"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector","topics":"topic_test","doris.topic2table.map": "topic_test:test_kafka_tbl","buffer.count.records":"10000","buffer.flush.time":"120","buffer.size.bytes":"5000000","doris.urls":"10.10.10.1","doris.user":"root","doris.password":"","doris.http.port":"8030","doris.query.port":"9030","doris.database":"test_db","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter"}
}'

操作 Kafka Connect

# 查看 connector 状态
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
# 删除当前 connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
# 暂停当前 connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
# 重启当前 connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
# 重启 connector 内的 tasks
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST

关于 Distributed 模式的介绍请参见 Distributed Workers。

消费普通数据​

  1. 导入数据样本:

在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64}

  1. 创建需要导入的表:

在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE test_db.test_kafka_connector_tbl(user_id            BIGINT       NOT NULL COMMENT "user id",name               VARCHAR(20)           COMMENT "name",age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;

  1. 创建导入任务:

在部署 Kafka Connect 的机器上,通过 curl 命令提交如下导入任务:

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name":"test-doris-sink-cluster","config":{"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max":"10","topics":"test-data-topic","doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl","buffer.count.records":"10000","buffer.flush.time":"120","buffer.size.bytes":"5000000","doris.urls":"10.10.10.1","doris.user":"root","doris.password":"","doris.http.port":"8030","doris.query.port":"9030","doris.database":"test_db","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter"}
}'

消费 Debezium 组件采集的数据​

  1. MySQL 数据库中有如下表:
CREATE TABLE test.test_user (user_id int NOT NULL ,name varchar(20),age int,PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);

  1. 在 Doris 创建被导入的表:
CREATE TABLE test_db.test_user(user_id            BIGINT       NOT NULL COMMENT "user id",name               VARCHAR(20)           COMMENT "name",age                INT                   COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;

  1. 部署 Debezium connector for MySQL 组件,参考:Debezium connector for MySQL。

  2. 创建 doris-kafka-connector 导入任务:

假设通过 Debezium 采集到的 MySQL 表数据在 mysql_debezium.test.test_user Topic 中:

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name":"test-debezium-doris-sink","config":{"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max":"10","topics":"mysql_debezium.test.test_user","doris.topic2table.map": "mysql_debezium.test.test_user:test_user","buffer.count.records":"10000","buffer.flush.time":"120","buffer.size.bytes":"5000000","doris.urls":"10.10.10.1","doris.user":"root","doris.password":"","doris.http.port":"8030","doris.query.port":"9030","doris.database":"test_db","converter.mode":"debezium_ingestion","enable.delete":"true","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter"}
}'

消费 AVRO 序列化格式数据​

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ "name":"doris-avro-test", "config":{ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", "topics":"avro_topic", "tasks.max":"10","doris.topic2table.map": "avro_topic:avro_tab", "buffer.count.records":"100000", "buffer.flush.time":"120", "buffer.size.bytes":"10000000", "doris.urls":"10.10.10.1", "doris.user":"root", "doris.password":"", "doris.http.port":"8030", "doris.query.port":"9030", "doris.database":"test", "load.model":"stream_load","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://127.0.0.1:8081","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://127.0.0.1:8081"} 
}'

消费 Protobuf 序列化格式数据​

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ "name":"doris-protobuf-test", "config":{ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", "topics":"proto_topic", "tasks.max":"10","doris.topic2table.map": "proto_topic:proto_tab", "buffer.count.records":"100000", "buffer.flush.time":"120", "buffer.size.bytes":"10000000", "doris.urls":"10.10.10.1", "doris.user":"root", "doris.password":"", "doris.http.port":"8030", "doris.query.port":"9030", "doris.database":"test", "load.model":"stream_load","key.converter":"io.confluent.connect.protobuf.ProtobufConverter","key.converter.schema.registry.url":"http://127.0.0.1:8081","value.converter":"io.confluent.connect.protobuf.ProtobufConverter","value.converter.schema.registry.url":"http://127.0.0.1:8081"} 
}'


http://www.ppmy.cn/news/1564984.html

相关文章

matlab函数的主要目的是对包含在 Excel 电子表格中的实验数据进行模型拟合

function Latex_Fitting_Sample_Code% clear screen and all variablesclc; clear;% 包含恒定通量横流结垢实验数据的 Excel 电子表格filename = Experimental Data.xlsx;% 包含模型拟合数据的 Excel 电子表格filename2 = Model Fit.xlsx; % 下面的循环采用不规则滤饼模型拟合 …

3.3 OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南

OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南 OpenAI 的 GPT 系列语言模型,包括 GPT-4、GPT-3.5 和 GPT-3,已经成为自然语言处理领域的标杆。无论是文本生成、对话系统,还是自动化任务,开发者都可以通过 API 调用这些强大的模型来增强他们的应用。本文将为您详细介…

Golang学习笔记_28——工厂方法模式

Golang学习笔记_25——协程 Golang学习笔记_26——通道 Golang学习笔记_27——单例模式 文章目录 工厂方法模式1. 介绍2. 优点3. 类图4. 实现 源码 工厂方法模式 1. 介绍 工厂方法模式(Factory Method)是一种创建型设计模式,它提供了一种创…

Node.js 与 JavaScript 是什么关系

JavaScript 是一种编程语言,而 Node.js 是 JavaScript 的一个运行环境,它们在不同的环境中使用,具有一些共同的语言基础,但也有各自独特的 API 和模块,共同推动着 JavaScript 在前后端开发中的广泛应用。 一、基础语言…

【Linux】15.Linux进程概念(4)

文章目录 程序地址空间前景回顾C语言空间布局图:代码1代码2代码3代码4代码5代码6代码7 程序地址空间前景回顾 历史核心问题: pid_t id fork(); if(id 0) else if(id>0) 为什么一个id可以放两个值呢?之前没有仔细讲。 C语言空间布局图&am…

北京市房屋建筑物轮廓shp数据arcgis高度字段内容下载分析

标题中的“北京市房屋建筑物轮廓shp数据arcgis高度字段”涉及到的是地理信息系统(GIS)中的数据格式和属性字段。在GIS领域,SHP(Shapefile)是一种常见的矢量数据格式,用于存储地理空间特征,如点、…

【PCIe 总线及设备入门学习专栏 6.1 -- PCIe MCTP】

文章目录 1 什么是 MCTP?2 MCTP 消息在 PCIe 中的传输特点3 PCIe MCTP 的局限性(1) 出站(Outbound)MCTP 消息分解的限制(2) 入站(Inbound)MCTP 消息组装的限制4 MCTP 消息的实际使用流程发送端处理流程接收端处理流程5 实际使用场景例 1:管理命令传输例 2:监控数据报告例…

实施工程师:面试基础宝典

一.运维工程师和实施工程师的区别:工作内容不同、职能不同、工作形式不同 1.工作内容不同: 运维工程师要对公司硬件和软件进行维护。 硬件包括:机房、机柜、网线光纤、PDU、服 务器、网络设备、安全设备等。 实施工程师包括常用操作系统、应…