使用Apache SeaTunnel高效集成和管理SftpFile数据源

ops/2024/9/23 3:54:28/

file

本文为Apache SeaTunnel已经支持的SftpFile Source Connector使用文档,旨在帮助读者理解如何高效地使用SFTP文件源连接器,以便轻松地使用Apache SeaTunnel集成和管理您的SftpFil数据源。

SftpFile 是指通过 SFTP(Secure File Transfer Protocol)协议进行文件操作的对象或组件。在网络编程和数据集成中,SFTPFile 通常用来表示和操作存储在远程 SFTP 服务器上的文件。SFTP 是一种安全的文件传输协议,基于 SSH(Secure Shell)协议,提供了加密的文件传输和远程文件操作功能。

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

  • 批处理
  • 列投影
  • 并行处理
  • 文件格式类型
    • 文本
    • CSV
    • JSON
    • Excel

描述

从 SFTP 文件服务器读取数据。

支持的数据源信息

使用 SftpFile 连接器,需要以下依赖项。可以通过 install-plugin.sh 下载,也可以从 Maven 中央仓库获取。

数据源支持的版本依赖项
SftpFile通用下载
  • 提示

如果你使用的是 Spark/Flink,请确保 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。

如果使用 SeaTunnel 引擎,安装 SeaTunnel 引擎时会自动集成 Hadoop JAR 包。可以在 ${SEATUNNEL_HOME}/lib 目录下检查这个 JAR 包是否存在。

为了支持更多的文件类型,我们做了一些妥协,所以在内部访问 Sftp 时我们使用了 HDFS 协议,这个连接器需要一些 Hadoop 依赖项,且仅支持 Hadoop 版2.9.X+ 版本。

数据类型映射

文件没有特定的类型列表,我们可以通过在配置中指定模式来指示要将哪个 SeaTunnel 数据类型转换为相应的数据。

SeaTunnel 数据类型
STRING
SHORT
INT
BIGINT
BOOLEAN
DOUBLE
DECIMAL
FLOAT
DATE
TIME
TIMESTAMP
BYTES
ARRAY
MAP

Source选项

名称类型必填默认值描述
host字符串-目标 SFTP 主机地址
port整数-目标 SFTP 端口号
user字符串-目标 SFTP 用户名
password字符串-目标 SFTP 密码
path字符串-源文件路径
file_format_type字符串-请查看下文的 #file_format_type
file_filter_pattern字符串-用于文件过滤的过滤器模式。
delimiter字符串\001字段分隔符,用于告诉连接器如何在读取文本文件时切割字段。默认 \001,与 Hive 的默认分隔符相同。
parse_partition_from_path布尔型true控制是否从文件路径中解析分区键和值。
例如,如果从路径中读取文件 oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26
那么文件中的每条记录将添加这两个字段:
name age
tyrantlucifer 26
提示:不要在模式选项中定义分区字段
date_format字符串yyyy-MM-dd日期类型的格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:
yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd,默认 yyyy-MM-dd
datetime_format字符串yyyy-MM-dd HH:mm:ss日期时间类型的格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:
yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss,默认 yyyy-MM-dd HH:mm:ss
time_format字符串HH:mm:ss时间类型的格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:
HH:mm:ss HH:mm:ss.SSS,默认 HH:mm:ss
skip_header_row_number长整型0跳过前几行,仅对 txt 和 csv 文件有效。
例如,设置如下:
skip_header_row_number = 2
那么 SeaTunnel 将跳过源文件的前两行。
sheet_name字符串-读取工作簿的工作表名称,仅在文件格式为 Excel 时使用。
schema配置项-请查看下文的 #schema
通用选项-Source 插件通用参数,请参考 Source通用选项 获取详细信息。

file_format_type [字符串]

支持以下文件类型: text csv parquet orc json excel 如果将文件类型指定为 json,需要配置 Schema 模式选项,向连接器说明如何解析数据为你需要所需的 Row。 例如:上游数据如下:

{"code":  200, "data":  "get success", "success":  true}

也可以将多个数据保存在一个文件中,并通过换行符进行分隔:

{"code":  200, "data":  "get success", "success":  true}
{"code":  300, "data":  "get failed", "success":  false}

需要按照以下方式配置 Schema:

schema {fields {code = intdata = stringsuccess = boolean}
}

连接器将生成以下数据:

codedatasuccess
200获取成功true
如果将文件类型指定为 parquetorc,则无需指定模式选项,连接器可以自动查找上游数据的模式。
如果将文件类型指定为 textcsv,则可以选择是否指定模式信息或不指定。

例如,上游数据如下:

tyrantlucifer#26#male

如果不配置 Schema,Connector 将这样处理上游数据:

如果分配数据模式,则除了 CSV 文件类型外,还应分配选项 delimiter。

内容
tyrantlucifer#26#male
如果配置了数据 Schema,除了CSV文件类型,还需要配置选项分隔符。

需要配置 Schema 和分隔符如下:

delimiter = "#"
schema {fields {name = stringage = intgender = string }
}

连接器将生成以下数据: | 姓名 | 年龄 | 性别 | |---------------|-----|------| | tyrantlucifer | 26 | 男 |

Schema [配置项]

fields [配置项] 上游数据的 Schema。

如何创建 Sftp 数据同步任务

以下示例演示了如何创建一个数据同步任务,从 Sftp 读取数据并在本地客户端上打印出来:

# 设置要执行的任务的基本配置
env {execution.parallelism = 1job.mode = "BATCH"
}# 创建连接到 Sftp 的源
source {SftpFile {host = "sftp"port = 22user = seatunnelpassword = passpath = "tmp/seatunnel/read/json"file_format_type = "json"result_table_name = "sftp"schema = {fields {c_map = "map<string, string>"c_array = "array<int>"c_string = stringc_boolean = booleanc_tinyint = tinyintc_smallint = smallintc_int = intc_bigint = bigintc_float = floatc_double = doublec_bytes = bytesc_date = datec_decimal = "decimal(38, 18)"c_timestamp = timestampc_row = {C_MAP = "map<string, string>"C_ARRAY = "array<int>"C_STRING = stringC_BOOLEAN = booleanC_TINYINT = tinyintC_SMALLINT = smallintC_INT = intC_BIGINT = bigintC_FLOAT = floatC_DOUBLE = doubleC_BYTES = bytesC_DATE = dateC_DECIMAL = "decimal(38, 18)"C_TIMESTAMP = timestamp}}}}
}# 控制台打印读取的 Sftp 数据
sink {Console {parallelism = 1}
}

本文由 白鲸开源科技 提供发布支持!


http://www.ppmy.cn/ops/114568.html

相关文章

LeetCode题练习与总结:回文链表--234

一、题目描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true示例 2&#xff1a; 输入&#x…

python函数的一些介绍

函数的多返回值 def 函数(): return 1,2,3 x,y,z 函数&#xff08;&#xff09;#对应1&#xff0c;2&#xff0c;3 有几个就要有对应的几个变量存储&#xff0c;不然会报错 函数的关键字参数 def 函数&#xff08;name,id&#xff09;&#xff1a; 打印输出name和id 函数…

CSAPP Bomb Lab

本 Lab 可以说是 CSAPP 的几个 Lab 中最为人津津乐道的一个&#xff0c;对应知识点为书中的第 3 章&#xff08;程序的机器级表示&#xff09;&#xff0c;要求使用 GDB 调试器&#xff0c;对汇编语言进行调试&#xff0c;从而得出正确的“拆弹密码”。共分为 6 个关卡和一个隐…

Android 命令行关机

在 Android 设备上&#xff0c;可以通过以下命令行命令来关机&#xff1a; adb shell reboot -p其中&#xff1a; adb shell&#xff1a;通过 ADB 进入设备的命令行环境。reboot -p&#xff1a;执行关机操作&#xff0c;-p 表示关机而不是重启。 如果你是在设备本地的终端上而…

Linux(Centos7)系统下给已有分区进行扩容

本文详细介绍了&#xff0c;如何给Centos中已有分区进行扩容&#xff0c;简单的几条命令即可完成。 文章目录 1. 创建物理卷 (PV)2. 将新的物理卷添加到卷组 (VG)3. 扩展逻辑卷 (LV)4. 扩展文件系统4.1 查看文件系统类型4.2 扩展文件系统 完成 1、首先把vmware中的linux关机&am…

第三章 掌握MySQL数据库的基本操作

文章目录 一、关系数据库标准语言SQL1.1 SQL的发展历史与特点1.2 SQL的分类 二、数据库的管理2.1 创建数据库2.2 查看数据库2.3 选择数据库2.4 删除数据库 三、MySQL存储引擎3.1 MySQL支持的存储引擎3.2 InnoDB存储引擎3.3 MyISAM存储引擎3.4 选择存储引擎 四、表的管理4.1 数据…

AUTOSAR UDS NRC

UDS NRC NRC 含义如表格所示 NRC代码描述含义0x00Ok没有错误,请求已成功执行0x01~0x0FISOSAEReservedISO 保留,暂时未定义0x10General reject服务请求被拒绝,原因不明确0x11Service not supported请求的服务不被支持0x12Sub-function not supported请求的子功能不被支持0x13…

Linux bash 关联数组

目录 一. 关联数组定义二. 访问关联数组三. 元素的添加与删除四. 键值对的获取与遍历五. 实际应用5.1 读取封装配置文件内容5.2 收集系统信息 一. 关联数组定义 从 Bash 4.0 开始&#xff0c;Bash 支持关联数组。关联数组允许你将键和值配对&#xff0c;并通过键来访问值&…