2. Doris数据导入与导出

embedded/2025/1/14 18:16:50/

一. Doris数据导入

导入方式使用场景支持的文件格式导入模式
Stream Load导入本地文件或者应用程序写入csv、json、parquet、orc同步
Broker Load从对象存储、HDFS等导入csv、json、parquet、orc异步
Routine Load从kakfa实时导入csv、json异步

1. Stream Load

基本原理

在使用 Stream Load 时,需要通过 HTTP 协议发起导入作业给 FE 节点,FE 会以轮询方式,重定向(redirect)请求给一个 BE 节点以达到负载均衡的效果。也可以直接发送 HTTP 请求作业给指定的 BE 节点。在 Stream Load 中,Doris 会选定一个节点作为 Coordinator 节点。Coordinator 节点负责接受数据并分发数据到其他节点上。

下图展示了 Stream Load 的主要流程:
在这里插入图片描述

  1. Client 向 FE 提交 Stream Load 导入作业请求。
  2. FE 会轮询选择一台 BE 作为 Coordinator 节点,负责导入作业调度,然后返回给 Client 一个 HTTP 重定向。
  3. Client 连接 Coordinator BE 节点,提交导入请求。
  4. Coordinator BE 会分发数据给相应 BE 节点,导入完成后会返回导入结果给 Client。
  5. Client 也可以直接通过指定 BE 节点作为 Coordinator,直接分发导入作业。
数据导入
  1. 本次导入文件为CSV格式,共9个字段,文件部分内容如下:

    [root@hadoop3 dns_data]# head -5 input.csv
    85.0.144.47,V2-vOd.kwAiCDn.cOM.,20220729005737,106.120.158.110,0,1,v2-vod.kwaicdn.com.w.cdngslb.cOM.,"",123.59.182.42
    111.0.40.49,apPle.COm.,20220729005737,17.253.144.10,0,1,"","",123.59.182.42
    211.0.172.212,SzMINORSHORT.WEIxin.Qq.com.,20220729005737,157.148.59.242,0,1,"","",123.59.182.42
    111.0.68.81,WWW.BILIbIlI.CoM.,20220729005737,61.156.196.6,0,1,a.w.bilicdn1.CoM.,"",123.59.182.42
    211.0.21.16,www.wAsU.cn.,20220729005737,103.15.99.89,0,1,www.wasu.cn.w.kunlunpi.com.,"",123.59.182.42
    
  2. 在Doris创建目标表

    mysql> use testdb;
    Database changed
    mysql> create table dns_data(client_ip varchar(1000),domain varchar(1000),time varchar(1000),target_ip varchar(1000),rcode varchar(1000),query_type varchar(1000),authority_record varchar(10000),add_msg varchar(1000),dns_ip varchar(1000)) DUPLICATE KEY(client_ip, domain, time, target_ip)DISTRIBUTED BY HASH(client_ip) BUCKETS 20;
    
  3. 启动导入作业

    # 本次导入的数据量为500w
    [root@hadoop3 dns_data]# wc -l sample.csv
    5000000 sample.csv[root@hadoop3 dns_data]# curl --location-trusted -u admin:admin123 \-H "Expect:100-continue" \-H "column_separator:," \-H "columns:client_ip,domain,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip" \-T sample.csv \-XPUT http://10.0.49.2:8050/api/testdb/dns_data/_stream_load
    
  4. 查看导入结果

    mysql> select count(*) from dns_data;
    +----------+
    | count(*) |
    +----------+
    |  5000000 |
    +----------+
    1 row in set (0.32 sec)
    

2. Broker Load

基本原理

用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。

BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

在这里插入图片描述
从上图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程,Broker 进程可以使用 Java 程序开发,更好的兼容大数据生态中的各类存储系统。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。

数据导入
  1. 清理dns_data表的数据,并将要导入的文件上传到HDFS

    mysql> truncate table dns_data;
    Query OK, 0 rows affected (4.32 sec)
    # 本次待导入的数据共计1.2亿
    [root@hadoop3 dns_data]# wc -l input.csv
    121936657 input.csv[root@hadoop3 dns_data]# hdfs dfs -put input.csv /test
    
  2. 启动导入作业

    mysql> LOAD LABEL hdfs_load_2025_01_13(DATA INFILE("hdfs://10.0.49.4:9000/test/input.csv")INTO TABLE dns_dataCOLUMNS TERMINATED BY ","FORMAT AS "CSV"(client_ip,domain,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip))with HDFS("fs.defaultFS" = "hdfs://10.0.49.4:9000","hadoop.username" = "root")PROPERTIES("timeout" = "3600");
    
  3. 查看导入任务

    mysql> show load;
    +-------+----------------------+---------+-------------+--------+---------+------------------------------------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+----------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------+-------+---------+
    | JobId | Label                | State   | Progress    | Type   | EtlInfo | TaskInfo                                                                     | ErrorMsg | CreateTime          | EtlStartTime        | EtlFinishTime       | LoadStartTime       | LoadFinishTime | URL  | JobDetails                                                                                                                                                                                                                                                | TransactionId | ErrorTablets | User  | Comment |
    +-------+----------------------+---------+-------------+--------+---------+------------------------------------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+----------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------+-------+---------+
    | 16265 | hdfs_load_2025_01_13 | LOADING | 0.00% (0/3) | BROKER | NULL    | cluster:hdfs_cluster; timeout(s):3600; max_filter_ratio:0.0; priority:NORMAL | NULL     | 2025-01-13 07:46:53 | 2025-01-13 07:47:20 | 2025-01-13 07:47:20 | 2025-01-13 07:47:20 | NULL           | NULL | {"Unfinished backends":{"58e66f688844d74-afb60b4ad54e4987":[10040,10059,10078]},"ScannedRows":3255264,"TaskNumber":1,"LoadBytes":546907629,"All backends":{"58e66f688844d74-afb60b4ad54e4987":[10040,10059,10078]},"FileNumber":1,"FileSize":16123485004} | 2290          | {}           | admin |         |
    +-------+----------------------+---------+-------------+--------+---------+------------------------------------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+----------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+--------------+-------+---------+
    1 row in set (0.12 sec)
    
  4. 查看导入结果

    mysql> select count(*) from dns_data;
    +----------+
    | count(*) |
    +----------+
    |121936657 |
    +----------+
    1 row in set (0.04 sec)
    

3. Routine Load

基本原理

Routine Load 会持续消费 Kafka Topic 中的数据,写入 Doris 中。
在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务:

  • 导入作业(load job):一个 Routine Load Job 是一个常驻的导入作业,会持续不断地消费数据源中的数据。
  • 导入任务(load task):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。

Routine Load 的导入具体流程如下图展示:
在这里插入图片描述

  1. Client 向 FE 提交创建 Routine Load 作业请求,FE 通过 Routine Load Manager 生成一个常驻的导入作业(Routine Load Job)。

  2. FE 通过Job Scheduler 将Routine Load Job 拆分成若干个 Routine Load Task,由 Task Scheduler 进行调度,下发到 BE 节点。

  3. 在 BE 上,一个 Routine Load Task 导入完成后向 FE 提交事务,并更新 Job 的元数据。

  4. 一个 Routine Load Task 提交后,会继续生成新的 Task,或对超时的 Task 进行重试。

  5. 新生成的 Routine Load Task 由 Task Scheduler 继续调度,不断循环。

数据导入
  1. 清理dns_data表的数据

    mysql> truncate table dns_data;
    Query OK, 0 rows affected (4.32 sec)
    
  2. 在kafka中创建topic

    [root@hadoop1 kafka-3.6.0]# bin/kafka-topics.sh --create --bootstrap-server 10.0.49.4:9092 --replication-factor 3 --partitions 3 --topic test
    
  3. 启动导入作业

    mysql> CREATE ROUTINE LOAD testdb.example_routine_load_csv ON dns_data
    COLUMNS TERMINATED BY ",",
    COLUMNS(client_ip, domain, time, target_ip, rcode, query_type,authority_record,add_msg,dns_ip)
    FROM KAFKA("kafka_broker_list" = "10.0.49.4:9092","kafka_topic" = "test","property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
    
  4. 向kafka topic中注入测试数据(自己写程序实现,很简单)

    # 导入数据500w
    [root@hadoop3 dns_data]# wc -l sample.csv
    5000000 sample.csv# 调用程序向topic test写入测试数据
    [root@hadoop3 dns_data]# java -jar mock_data.jar 10.0.49.2:9092,10.0.49.3:9092,10.0.49.4:9092 test  sample.csv
    
  5. 查看导入任务

    mysql> show routine load\G;
    *************************** 1. row ***************************Id: 16361Name: example_routine_load_csvCreateTime: 2025-01-13 08:17:23PauseTime: NULLEndTime: NULLDbName: default_cluster:testdbTableName: dns_dataIsMultiTable: falseState: RUNNINGDataSourceType: KAFKACurrentTaskNum: 3JobProperties: {"max_batch_rows":"200000","timezone":"Europe/London","send_batch_parallelism":"1","load_to_single_tablet":"false","column_separator":"','","line_delimiter":"\n","current_concurrent_number":"3","delete":"*","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"client_ip,domain,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"csv","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
    DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.49.4:9092"}CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"example_routine_load_csv_df6ba034-d178-4d05-bfb1-ad3635da7231"}Statistic: {"receivedBytes":0,"runningTxns":[2339,2340,2341],"errorRows":0,"committedTaskNum":3,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":30309}Progress: {"0":"OFFSET_BEGINNING","1":"OFFSET_BEGINNING","2":"OFFSET_BEGINNING"}Lag: {"0":2,"1":2,"2":2}
    ReasonOfStateChanged:ErrorLogUrls:OtherMsg:User: adminComment:
    1 row in set (0.07 sec)ERROR:
    No query specified
    
  6. 查看导入结果

    mysql> select count(*) from dns_data;
    +----------+
    | count(*) |
    +----------+
    |  5000000 |
    +----------+
    1 row in set (0.04 sec)
    

二. Doris数据导出

SELECT INTO OUTFILEEXPORT
同步/异步同步异步(提交 EXPORT 任务后通过 SHOW EXPORT 命令查看任务进度)
支持任意 SQL支持不支持
导出指定分区支持支持
导出指定 Tablets支持不支持
并发导出支持且并发高(但取决于 SQL 语句是否有 ORDER BY 等需要单机处理的算子)支持且并发高(支持 Tablet 粒度的并发导出)
支持导出的数据格式Parquet、ORC、CSVParquet、ORC、CSV
是否支持导出外表支持部分支持
是否支持导出 View支持支持
支持的导出位置S3、HDFSS3、HDFS

SELECT INTO OUTFILE
适用于以下场景:

  • 导出数据需要经过复杂计算逻辑的,如过滤、聚合、关联等。
  • 适合执行同步任务的场景。

EXPORT
适用于以下场景:

  • 大数据量的单表导出、仅需简单的过滤条件。
  • 需要异步提交任务的场景。

1. SELECT INTO OUTFILE

  1. 查看待导出表的数据

    mysql> select count(*) from dns_data;
    +----------+
    | count(*) |
    +----------+
    |  5000000 |
    +----------+
    1 row in set (0.04 sec)
    
  2. 启动导出作业

    mysql> SELECT * FROM dns_data
    INTO OUTFILE "hdfs://10.0.49.4:9000/doris/result_"
    FORMAT AS CSV
    PROPERTIES
    ("fs.defaultFS" = "hdfs://10.0.49.4:9000","hadoop.username" = "root","column_separator" = ","
    );+------------+-----------+-----------+-----------------------------------------------------------------------+
    | FileNumber | TotalRows | FileSize  | URL                                                                   |
    +------------+-----------+-----------+-----------------------------------------------------------------------+
    |          1 |   5000000 | 660546528 | hdfs://10.0.49.4:9000/doris/result_c1f1268d26b547f6-a906f9570e7bfa04_ |
    +------------+-----------+-----------+-----------------------------------------------------------------------+
    1 row in set (23.04 sec)
  3. 查看导出结果

    [root@hadoop3 dns_data]# hdfs dfs -get /doris/result_c1f1268d26b547f6-a906f9570e7bfa04_0.csv  a.csv
    [root@hadoop3 dns_data]# wc -l a.csv
    5000000 a.csv
    

2. EXPORT

  1. 查看待导出表的数据

    mysql> select count(*) from dns_data;
    +----------+
    | count(*) |
    +----------+
    |  5000000 |
    +----------+
    1 row in set (0.04 sec)
    
  2. 启动导出作业

    mysql> EXPORT TABLE dns_data
    TO "hdfs://10.0.49.4:9000/doris/export_" 
    PROPERTIES
    ("line_delimiter" = "\n","column_separator" = ","
    )
    with HDFS ("fs.defaultFS"="hdfs://10.0.49.4:9000","hadoop.username" = "root");
    
  3. 查看导出作业

    mysql> show export \G;
    *************************** 1. row ***************************JobId: 15099Label: export_39314fdb-3962-40a5-aefa-463a56b33675State: CANCELLEDProgress: 0%TaskInfo: {"partitions":["*"],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","broker":"HDFS","column_separator":"\t","line_delimiter":",","db":"default_cluster:testdb","tbl":"tbl","tablet_num":0}Path: hdfs://doris/export/export_CreateTime: 2025-01-12 15:44:57StartTime: 2025-01-12 15:45:21FinishTime: 2025-01-12 15:45:51Timeout: 7200ErrorMsg: type:RUN_FAIL; msg:errCode = 2, detailMessage = (10.0.49.2)[INTERNAL_ERROR]create dir failed. (BE: 10.0.49.2) namenode: hdfs://hadoop1:9000 path: hdfs://doris/export, err: (22), Invalid argument), reason: IllegalArgumentException: Wrong FS: hdfs://doris/export, expected: hdfs://hadoop1:9000
    OutfileInfo: NULL
    *************************** 2. row ***************************JobId: 15100Label: export_7e593913-3369-4138-91b1-b219d011e55cState: CANCELLEDProgress: 0%TaskInfo: {"partitions":["*"],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","broker":"HDFS","column_separator":"\t","line_delimiter":",","db":"default_cluster:testdb","tbl":"tbl","tablet_num":0}Path: hdfs://doris/export/export_CreateTime: 2025-01-12 15:49:12StartTime: 2025-01-12 15:49:21FinishTime: 2025-01-12 15:49:49Timeout: 7200ErrorMsg: type:RUN_FAIL; msg:errCode = 2, detailMessage = (10.0.49.4)[INTERNAL_ERROR]create dir failed. (BE: 10.0.49.4) namenode: hdfs://10.0.49.4:9000 path: hdfs://doris/export, err: (22), Invalid argument), reason: IllegalArgumentException: Wrong FS: hdfs://doris/export, expected: hdfs://10.0.49.4:9000
    OutfileInfo: NULL
    *************************** 3. row ***************************
    ...
    
  4. 查看导出结果

    [root@hadoop3 dns_data]# hdfs dfs -ls /doris
    Found 1 items
    -rw-r--r--   3 root supergroup  660546528 2025-01-13 08:47 /doris/export_27d22e086e5e4ba8-b7d27eb2428844e2_0.csv
    [root@hadoop3 dns_data]# hdfs dfs -get /doris/export_27d22e086e5e4ba8-b7d27eb2428844e2_0.csv a.csv
    [root@hadoop3 dns_data]# wc -l a.csv
    5000000 a.csv
    

http://www.ppmy.cn/embedded/153902.html

相关文章

英语互助小程序springboot+论文源码调试讲解

第2章 开发环境与技术 英语互助小程序的编码实现需要搭建一定的环境和使用相应的技术,接下来的内容就是对英语互助小程序用到的技术和工具进行介绍。 2.1 MYSQL数据库 本课题所开发的应用程序在数据操作方面是不可预知的,是经常变动的,没有…

No. 31 笔记 | Web安全-SQL手工注入技术学习 Part 2

一、研究背景 背景介绍 SQL注入是一种常见且高危的Web安全漏洞。攻击者可以通过构造恶意SQL查询语句来绕过验证机制,执行未授权操作,如获取敏感信息、篡改数据库内容甚至控制服务器。 研究内容 本笔记探讨以下数据库的手工注入技术: MySQLAc…

PostgreSQL 超级管理员详解

1. 什么是 PostgreSQL 超级管理员 PostgreSQL 超级管理员(superuser)是拥有数据库系统最高权限的用户。他们可以执行任何数据库操作,包括但不限于创建和删除数据库、用户、表空间、模式等。超级管理员权限是 PostgreSQL 中权限的最高级别。 …

实现Windows云服务器文件共享

实现Windows云服务器文件共享 操作前提确认服务器配置修改网络和共享中心配置文件共享 操作前提 需要实现文件共享的两台云服务器都在同一子网下,且网络互通 确认服务器配置 确保“Tcp/IP NetBIOS Helper”服务状态为“已启动”: 右键“win”键&#…

基于Android的嵌入式车载导航系统(源码+lw+部署文档+讲解),源码可白嫖!

摘要 嵌入式车载导航系统设计的目的是为用户提供导航公告、地图,进行导航的一个导航系统APP。 与PC端应用程序相比,嵌入式车载导航系统的设计主要面向于广大用户,旨在为用户提供一个嵌入式车载导航系统平台。用户可以通过APP查看导航公告&am…

Qt天气预报系统获取天气数据

Qt天气预报系统获取天气数据 1、获取天气数据1.1添加天气类头文件1.2定义今天和未来几天天气数据类1.3定义一个解析JSON数据的函数1.4在mainwindow中添加weatherData.h1.5创建今天天气数据和未来几天天气数据对象1.6添加parseJson定义1.7把解析JSON数据添加进去1.8添加错误1.9解…

maven 项目怎么指定打包后名字

在 Spring Boot 的 Maven 项目中,你可以通过配置 pom.xml 文件来指定打包后的文件名。具体步骤如下: 打开 pom.xml 文件:找到你的项目根目录下的 pom.xml 文件。 配置 finalName 属性:在 标签下,添加 属性来指定打包后…

系统看门狗配置--以ubuntu为例

linux系统配置看门狗 以 ubuntu 系统配置看门狗为例 配置看门狗使用的脚本文件,需要使用管理员权限来执行: 配置是:系统每 30S 喂一次狗,超过 60S 不进行投喂,就会自动重启。 1. 系统脚本内容: #!/bin/b…