doris:Kafka 导入数据

server/2025/1/21 14:39:27/

Doris 提供以下方式从 Kafka 导入数据:

  • 使用 Routine Load 消费 Kafka 数据

Doris 通过 Routine Load 持续消费 Kafka Topic 中的数据。提交 Routine Load 作业后,Doris 会实时生成导入任务,消费 Kafka 集群中指定 Topic 的消息。Routine Load 支持 CSV 和 JSON 格式,具备 Exactly-Once 语义,确保数据不丢失且不重复。更多文档请参考 Routine Load。

  • Doris Kafka Connector 消费 Kafka 数据

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSON、Avro、Protobuf),并支持解析 Debezium 组件的数据格式。更多文档请参考 Doris Kafka Connector。

使用 Routine Load 消费 Kafka 数据​

使用限制​

  1. 支持的消息格式为 CSV 和 JSON。CSV 每个消息为一行,且行尾不包含换行符;
  2. 默认支持 Kafka 0.10.0.0 及以上版本。若需使用旧版本(如 0.9.0,0.8.2,0.8.1,0.8.0),需修改 BE 配置,将 kafka_broker_version_fallback 设置为兼容的旧版本,或在创建 Routine Load 时设置 property.broker.version.fallback。使用旧版本可能导致部分新特性无法使用,如根据时间设置 Kafka 分区的 offset。

操作示例​

在 Doris 中通过 CREATE ROUTINE LOAD 命令创建常驻 Routine Load 导入任务,分为单表导入和多表导入。详细语法请参考 CREATE ROUTINE LOAD。

单表导入​

第 1 步:准备数据

在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-routine-load-csv --from-beginning
1,Emily,25

第 2 步:在库中创建表

在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE testdb.test_routineload_tbl(user_id            BIGINT       NOT NULL COMMENT "user id",name               VARCHAR(20)           COMMENT "name",age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至单表

在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

CREATE ROUTINE LOAD testdb.example_routine_load_csv ON test_routineload_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, name, age)
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-routine-load-csv","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

第 4 步:检查导入数据

mysql> select * from test_routineload_tbl;
+-----------+----------------+------+
| user_id   | name           | age  |
+-----------+----------------+------+
|  1        | Emily          | 25   |
+-----------+----------------+------+
1 rows in set (0.01 sec)

多表导入​

对于需要同时导入多张表的场景,Kafka 中的数据需包含表名信息。支持从 Kafka 的 Value 中获取动态表名,格式为:table_name|{"col1": "val1", "col2": "val2"}。CSV 格式类似:table_name|val1,val2,val3。注意,表名必须与 Doris 中的表名一致,否则导入失败,且动态表不支持后面介绍的 column_mapping 配置。

第 1 步:准备数据

在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-multi-table-load --from-beginning
test_multi_table_load1|1,Emily,25
test_multi_table_load2|2,Benjamin,35

第 2 步:在库中创建表

在 Doris 中创建被导入的表,具体语法如下:

表 1:

CREATE TABLE test_multi_table_load1(user_id            BIGINT       NOT NULL COMMENT "用户 ID",name               VARCHAR(20)           COMMENT "用户姓名",age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

表 2:

CREATE TABLE test_multi_table_load2(user_id            BIGINT       NOT NULL COMMENT "用户 ID",name               VARCHAR(20)           COMMENT "用户姓名",age                INT                   COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

第 3 步:创建 Routine Load job 导入数据至多表

在 Doris 中,使用 CREATE ROUTINE LOAD 命令创建导入作业:

CREATE ROUTINE LOAD example_multi_table_load
COLUMNS TERMINATED BY ","
FROM KAFKA("kafka_broker_list" = "192.168.88.62:9092","kafka_topic" = "test-multi-table-load","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

第 4 步:检查导入数据

mysql> select * from test_multi_table_load1;
+------+----------------+------+
| id   | name           | age  |
+------+----------------+------+
|  1   | Emily          | 25   |
+------+----------------+------+
1 rows in set (0.01 sec)mysql> select * from test_multi_table_load2;
+------+----------------+------+
| id   | name           | age  |
+------+----------------+------+
|  2   | Benjamin       | 35   |
+------+----------------+------+
1 rows in set (0.01 sec)

配置安全认证

有关带有认证的 Kafka 配置方法,请参见 Kafka 安全认证。

doris-kafka-connector-消费-kafka-数据">使用 Doris Kafka Connector 消费 Kafka 数据​

Doris Kafka Connector 是将 Kafka 数据流导入 Doris 数据库的工具。用户可通过 Kafka Connect 插件轻松导入多种序列化格式(如 JSON、Avro、Protobuf),并支持解析 Debezium 组件的数据格式。

以 Distributed 模式启动​

Distributed 模式为 Kafka Connect 提供可扩展性和自动容错功能。在此模式下,可以使用相同的 group.id 启动多个工作进程,它们会协调在所有可用工作进程中安排连接器和任务的执行。

  1. 在 $KAFKA_HOME 下创建 plugins 目录,将下载好的 doris-kafka-connector jar 包放入其中。
  2. 配置 config/connect-distributed.properties
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092# 修改 group.id,同一集群的需要一致
group.id=connect-cluster# 修改为创建的 plugins 目录
# 注意:此处请填写 Kafka 的直接路径。例如:plugin.path=/opt/kafka/plugins
plugin.path=$KAFKA_HOME/plugins# 建议将 Kafka 的 max.poll.interval.ms 时间调大到 30 分钟以上,默认 5 分钟
# 避免 Stream Load 导入数据消费超时,消费者被踢出消费群组
max.poll.interval.ms=1800000
consumer.max.poll.interval.ms=1800000

  1. 启动:
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

  1. 消费 Kafka 数据:
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name":"test-doris-sink-cluster","config":{"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector","topics":"topic_test","doris.topic2table.map": "topic_test:test_kafka_tbl","buffer.count.records":"10000","buffer.flush.time":"120","buffer.size.bytes":"5000000","doris.urls":"10.10.10.1","doris.user":"root","doris.password":"","doris.http.port":"8030","doris.query.port":"9030","doris.database":"test_db","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter"}
}'

操作 Kafka Connect

# 查看 connector 状态
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/status -X GET
# 删除当前 connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster -X DELETE
# 暂停当前 connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/pause -X PUT
# 重启当前 connector
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/resume -X PUT
# 重启 connector 内的 tasks
curl -i http://127.0.0.1:8083/connectors/test-doris-sink-cluster/tasks/0/restart -X POST

关于 Distributed 模式的介绍请参见 Distributed Workers。

消费普通数据​

  1. 导入数据样本:

在 Kafka 中,样本数据如下:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-data-topic --from-beginning
{"user_id":1,"name":"Emily","age":25}
{"user_id":2,"name":"Benjamin","age":35}
{"user_id":3,"name":"Olivia","age":28}
{"user_id":4,"name":"Alexander","age":60}
{"user_id":5,"name":"Ava","age":17}
{"user_id":6,"name":"William","age":69}
{"user_id":7,"name":"Sophia","age":32}
{"user_id":8,"name":"James","age":64}
{"user_id":9,"name":"Emma","age":37}
{"user_id":10,"name":"Liam","age":64}

  1. 创建需要导入的表:

在 Doris 中创建被导入的表,具体语法如下:

CREATE TABLE test_db.test_kafka_connector_tbl(user_id            BIGINT       NOT NULL COMMENT "user id",name               VARCHAR(20)           COMMENT "name",age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;

  1. 创建导入任务:

在部署 Kafka Connect 的机器上,通过 curl 命令提交如下导入任务:

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name":"test-doris-sink-cluster","config":{"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max":"10","topics":"test-data-topic","doris.topic2table.map": "test-data-topic:test_kafka_connector_tbl","buffer.count.records":"10000","buffer.flush.time":"120","buffer.size.bytes":"5000000","doris.urls":"10.10.10.1","doris.user":"root","doris.password":"","doris.http.port":"8030","doris.query.port":"9030","doris.database":"test_db","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter"}
}'

消费 Debezium 组件采集的数据​

  1. MySQL 数据库中有如下表:
CREATE TABLE test.test_user (user_id int NOT NULL ,name varchar(20),age int,PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;insert into test.test_user values(1,'zhangsan',20);
insert into test.test_user values(2,'lisi',21);
insert into test.test_user values(3,'wangwu',22);

  1. 在 Doris 创建被导入的表:
CREATE TABLE test_db.test_user(user_id            BIGINT       NOT NULL COMMENT "user id",name               VARCHAR(20)           COMMENT "name",age                INT                   COMMENT "age"
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 12;

  1. 部署 Debezium connector for MySQL 组件,参考:Debezium connector for MySQL。

  2. 创建 doris-kafka-connector 导入任务:

假设通过 Debezium 采集到的 MySQL 表数据在 mysql_debezium.test.test_user Topic 中:

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{"name":"test-debezium-doris-sink","config":{"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector","tasks.max":"10","topics":"mysql_debezium.test.test_user","doris.topic2table.map": "mysql_debezium.test.test_user:test_user","buffer.count.records":"10000","buffer.flush.time":"120","buffer.size.bytes":"5000000","doris.urls":"10.10.10.1","doris.user":"root","doris.password":"","doris.http.port":"8030","doris.query.port":"9030","doris.database":"test_db","converter.mode":"debezium_ingestion","enable.delete":"true","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter"}
}'

消费 AVRO 序列化格式数据​

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ "name":"doris-avro-test", "config":{ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", "topics":"avro_topic", "tasks.max":"10","doris.topic2table.map": "avro_topic:avro_tab", "buffer.count.records":"100000", "buffer.flush.time":"120", "buffer.size.bytes":"10000000", "doris.urls":"10.10.10.1", "doris.user":"root", "doris.password":"", "doris.http.port":"8030", "doris.query.port":"9030", "doris.database":"test", "load.model":"stream_load","key.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://127.0.0.1:8081","value.converter":"io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url":"http://127.0.0.1:8081"} 
}'

消费 Protobuf 序列化格式数据​

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ "name":"doris-protobuf-test", "config":{ "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector", "topics":"proto_topic", "tasks.max":"10","doris.topic2table.map": "proto_topic:proto_tab", "buffer.count.records":"100000", "buffer.flush.time":"120", "buffer.size.bytes":"10000000", "doris.urls":"10.10.10.1", "doris.user":"root", "doris.password":"", "doris.http.port":"8030", "doris.query.port":"9030", "doris.database":"test", "load.model":"stream_load","key.converter":"io.confluent.connect.protobuf.ProtobufConverter","key.converter.schema.registry.url":"http://127.0.0.1:8081","value.converter":"io.confluent.connect.protobuf.ProtobufConverter","value.converter.schema.registry.url":"http://127.0.0.1:8081"} 
}'


http://www.ppmy.cn/server/160200.html

相关文章

Web前端开发技术之HTMLCSS知识点总结

学习路线 一、新闻网界面1. 代码示例2. 效果展示3. 知识点总结3.1 HTML标签和字符实体3.2 超链接、颜色描述与标题元素3.3 关于图片和视频标签:3.4 CSS引入方式3.5 CSS选择器优先级 二、flex布局1. 代码示例2. 效果展示3. 知识点总结3.1 span标签和flex容器的区别3.…

react中的hook

在 React 中,Hooks 是一种在函数组件中使用状态和其他 React 特性(如生命周期方法)的新方式。它们在 React 16.8 中被引入,并且极大简化了组件的状态管理和副作用处理。 常见的 React Hook useStateuseEffectuseContextuseReduc…

电气防火保护器为高校学生宿舍提供安全保障

摘 要:3月2日,清华大学紫荆学生公寓发生火情,无人员伤亡。推断起火原因系中厅内通电电器发生故障引燃周边可燃物所致。2月27日,贵州某高校女生宿舍发生火灾,现场明火得到有效控制,无人员受伤。2月19日&…

得物App亮相第七届进博会,科技赋能打造消费新热点

在2024年11月5日至11月10日举办的第七届进博会舞台上,上海交易团虹口分团表现亮眼,其中得物作为来自虹口品质电商的践行者,备受众多参观者关注。 上海得物信息集团有限公司自2015年于上海虹口创立以来,始终坚守“满足年轻人对美好…

第4章 Kafka核心API——Kafka客户端操作

Kafka客户端操作 一. 客户端操作1. AdminClient API 一. 客户端操作 1. AdminClient API

JAVA-Exploit编写(6)--http-request库文件上传使用

目录 1.http-request简介 2. 依赖导入 3.文件上传页面代码 4. http-request文件上传简单使用 5.请求https的网站解决SSL证书的问题 5. 1 直接请求带https域名的网站 5.2 信任所有证书 1.http-request简介 http-request 是一个库 里面提供很多方法,使得很容易就…

QT+VS2022 应用程序无法启动0x000007b问题记录

不知道怎么搞的出现了这个问题。记录一下我做的工作; 1.检查环境变量 我安装了两个版本的Qt,环境变量的顺序很重要,要把程序使用的那个版本靠前放; 可以参考: 彻底解决Qt报错:无法定位程序输入点于动态…

Linux -- HTTP 请求 与 响应 报文

目录 请求报文: 请求方法 响应报文: 状态码 与 状态码描述 共性 常见的报头 请求报文: 请求方法 方法说明GET获取资源POST传输实体主体PUT传输文件HEAD获得报文首部DELETE删除文件OPTIONS询问支持的方法TRACE追踪路径CONNECT要求用…