目录
安装部署
解压
环境变量
安装plugin
添加资源jar包
SEATUNNEL 配置文件
env:环境设置
source:数据源设置
sink:数据去向设置
transform: 数据转换设置
运行方式
seatunnel 引擎(zeta)
本地模式
集群模式
安装部署
解压
tar -zxvf apache-SeaTunnel-incubating-2.3.1-bin.tar.gz
环境变量
vi /etc/profileexport SEATUNNEL_HOME=/data/soft/seatunnel-2.3.1
export PATH=$PATH:${SEATUNNEL_HOME}/bin
安装plugin
# 备份原有的组件配置
cp ${SEATUNNEL_HOME}/config/plugin_config ${SEATUNNEL_HOME}/config/plugin_config_bak
# 创建一个新的
vi ${SEATUNNEL_HOME}/config/plugin_config
# 只安装这三种插件,因为他的插件太多了,下载很慢,用哪个下哪个
--connectors-v2--
connector-jdbc
connector-kafka
connector-console
--end--# 运行安装命令sh ${SEATUNNEL_HOME}/bin/install-plugin.sh 2.3.1
添加资源jar包
cd ${SEATUNNEL_HOME}/lib
# 添加以下依赖jar,连接mysql需要使用
mysql-connector-j-8.0.31.jar
SEATUNNEL 配置文件
env:环境设置
source:数据源设置
sink:数据去向设置
transform: 数据转换设置
env {execution.parallelism = 1
}
source {Kafka {topic = "seatunnel_topic"format = textfield_delimiter = "#"schema = {fields {name = "string"age = "int"}}consumer.group = "yuanqu-group11"bootstrap.servers = "hdp01:6667"start_mode = "latest"kafka.config = {security.protocol = "SASL_PLAINTEXT"sasl.mechanism = "PLAIN"sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"Dmp@2022\";"}}
}
transform {}
sink {jdbc {url = "jdbc:mysql://172.16.10.143:3306/test_db"driver = "com.mysql.cj.jdbc.Driver"user = "root"password = "Sdunisi_"query = "insert into seatunnel_test(name,age) values(?,?)"}
}
运行方式
seatunnel 包括三种执行引擎
seatunnel 引擎(zeta)
本地模式
# -e local 指定本地执行
${SEATUNNEL_HOME}/bin/seatunnel.sh --config test.conf -e local
集群模式
启动集群
# 启动集群,我这是一台机器伪集群不需要任何配置,如果配置看https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/deployment
nohup ${SEATUNNEL_HOME}/bin/seatunnel-cluster.sh & 1>/dev/null 2>&1
启动集群任务
# -e local 指定本地执行
nohup ${SEATUNNEL_HOME}/bin/seatunnel.sh --config test.conf & 1>/dev/null 2>&1
停止任务
启动任务之后会有一个jobid,如果直接kill 进程是不管用的,seatunnel任务依然会执行,需要使用 -can jobid 结束任务
#我们需要使用 -can, --cancel-job Cancel job by JobId
${SEATUNNEL_HOME}/bin/seatunnel.sh -can 720924469203304449