seatunnel 2.3.1全流程部署使用

news/2024/10/17 3:59:59/

Seatunnel 2.3.1 部署使用

  • 1 部署
    • 1.1 下载解压
    • 1.2 下载对应的connector
    • 1.3 安装seatunnel
    • ⭐1.4 补充一些jar包
  • 2 测试样例
    • 2.1 官方demo fake to console
    • 2.2 mysql to console
    • 2.3 hive to console
    • 2.4 mysql to hive
  • 3 欢迎讨论

1 部署

1.1 下载解压

https://dlcdn.apache.org/incubator/seatunnel/2.3.1/apache-seatunnel-incubating-2.3.1-bin.tar.gz

下载完毕之后上传到服务器上面并解压

# 解压到了/opt/module目录下
tar -zxvf apache-seatunnel-incubating-2.3.1-bin.tar.gz -C /opt/module

1.2 下载对应的connector

在apache的仓库下载相应的connector,下载时每个jar包在不同的路径下面,放到/seatunnel-2.3.1/connectors/seatunnel目录下

https://repo.maven.apache.org/maven2/org/apache/seatunnel/

connector-assert-2.3.1.jar
connector-cdc-mysql-2.3.1.jar
connector-console-2.3.1.jar # 自带的
connector-doris-2.3.1.jar
connector-elasticsearch-2.3.1.jar
connector-fake-2.3.1.jar # 自带的
connector-file-hadoop-2.3.1.jar
connector-file-local-2.3.1.jar
connector-hive-2.3.1.jar
connector-iceberg-2.3.1.jar
connector-jdbc-2.3.1.jar
connector-kafka-2.3.1.jar
connector-redis-2.3.1.jar

配置安装seatunnel的插件

vim  seatunnel-2.3.1/config/plugin_config

调用安装脚本的时候会在maven的中央仓库下载对应的jar包,尽量少放,下载太慢了,我放了这些

--connectors-v2--
connector-assert
connector-cdc-mysql
connector-jdbc
connector-fake
connector-console
--end--

1.3 安装seatunnel

sh bin/install-plugin.sh 2.3.1

整个过程非常慢…应该是从maven中央仓库下载东西

⭐1.4 补充一些jar包

  • 使用hive的话需要将这两个jar放入到seatunnel-2.3.1/lib目录下:
hive-exec-2.3.9.jar
# 下载链接
# https://repo.maven.apache.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
# 注意这里是hive-exec-2.3.9.jar,不要从你的hive的lib目录下拷贝最新的jar包,就用这个seatunnel-hadoop3-3.1.4-uber-2.3.1.jar  
# 下载链接
# https://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.1/seatunnel-hadoop3-3.1.4-uber-2.3.1.jar  seatunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar
# 下载链接
# hhttps://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.1/seatunnel-hadoop3-3.1.4-uber-2.3.1-optional.jar

中间由于其他缘故我拷贝了一个hive框架/lib目录下的libfb303-0.9.3.jar放到seatunnellib目录下了。

  • 使用mysql的话需要将mysql的驱动拷贝过来,应该是需要8系列的mysql驱动,我这里使用的是mysql-connector-java-8.0.21.jar

2 测试样例

2.1 官方demo fake to console

seatunnel-2.3.1/config/v2.batch.config.template

env {execution.parallelism = 2job.mode = "BATCH"checkpoint.interval = 10000
}source {FakeSource {parallelism = 2result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}sink {Console {}
}

运行命令

cd /opt/module/seatunnel-2.3.1
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e lcoal

运行成功的话会可以在console看到打印的测试数据

2.2 mysql to console

我新建了一个用来放运行配置的目录/opt/module/seatunnel-2.3.1/job

vim mysql_2console.conf

mysql_2console.conf

env {execution.parallelism = 2job.mode = "BATCH"checkpoint.interval = 10000
}
source{Jdbc {url = "jdbc:mysql://hadoop102/dim_db?useUnicode=true&characterEncoding=utf8&useSSL=false"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xxxxxx"query = "select * from dim_basicdata_date_a_d where date < '2010-12-31'"}
}sink {Console {}
}

查询的是一张日期维表的数据

建表语句:

CREATE DATABASE dim_db DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;drop table if exists  dim_db.dim_basicdata_date_a_d;
create table if not exists dim_db.dim_basicdata_date_a_d
(`date`          varchar(40) comment '日期',`year`          varchar(40) comment '年',`quarter`       varchar(40) comment '季度(1/2/3/4)',`season`        varchar(40) comment '季节(春季/夏季/秋季/冬季)',`month`         varchar(40) comment '月',`day`           varchar(40) comment '日',`week`          varchar(40) comment '年内第几周',`weekday`       varchar(40) comment '周几(1-周一/2-周二/3-周三/4-周四/5-周五/6-周六/7-周日)',`is_workday`    varchar(40) comment '是否是工作日(1-是,0-否)',`date_type`     varchar(40) comment '节假日类型(工作日/法定上班[调休]/周末/节假日)',`update_date`   varchar(40) comment '更新日期'
);

可以自己插入几条数据试试

运行命令

cd /opt/module/seatunnel-2.3.1
./bin/seatunnel.sh --config ./job/mysql_2console.conf  -e local

2.3 hive to console

创建一张hive表

CREATE database db_hive;drop table if exists  db_hive.dim_basicdata_date_a_d;
create table if not exists db_hive.dim_basicdata_date_a_d
(`date`          string comment '日期',`year`          string comment '年',`quarter`       string comment '季度(1/2/3/4)',`season`        string comment '季节(春季/夏季/秋季/冬季)',`month`         string comment '月',`day`           string comment '日',`week`          string comment '年内第几周',`weekday`       string comment '周几(1-周一/2-周二/3-周三/4-周四/5-周五/6-周六/7-周日)',`is_workday`    string comment '是否是工作日(1-是,0-否)',`date_type`     string comment '节假日类型(工作日/法定上班[调休]/周末/节假日)',`update_date`   string comment '更新日期'
);

自行插入几条数据

创建配置文件hive_2console.conf

env {execution.parallelism = 2job.mode = "BATCH"checkpoint.interval = 10000
}
source{Hive {table_name = "db_hive.dim_basicdata_date_a_d"metastore_uri = "thrift://hadoop102:9083"}
}sink {Console {}
}

这里我使用的hive连接方式是jdbc访问元数据,所以metastore_uri = "jdbc:hive2://hadoop102:10000"也可以正常使用。

hive-site.xml修改配置文件,有可能你已经配置好了

    <!-- 为了方便连接,采用直连的方式连接到hive数据库,注释掉下面三条配置信息 --><!-- 指定存储元数据要连接的地址 --><property><name>hive.metastore.uris</name><value>thrift://hadoop102:9083</value></property><!-- 指定 hiveserver2 连接的 host --><property><name>hive.server2.thrift.bind.host</name><value>hadoop102</value></property><!-- 指定 hiveserver2 连接的端口号 --><property><name>hive.server2.thrift.port</name><value>10000</value></property>

运行命令

cd /opt/module/seatunnel-2.3.1
./bin/seatunnel.sh --config ./job/hive_2console.conf -e local

2.4 mysql to hive

创建配置文件

dim_basicdate_mysql_2hive.conf

env {execution.parallelism = 2job.mode = "BATCH"checkpoint.interval = 10000
}
source{Jdbc {url = "jdbc:mysql://hadoop102/dim_db?useUnicode=true&characterEncoding=utf8&useSSL=false"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "111111"query = "select * from dim_basicdata_date_a_d"}
}sink {Hive {table_name = "db_hive.dim_basicdata_date_a_d"metastore_uri = "thrift://hadoop102:9083"}
}

运行命令

cd /opt/module/seatunnel-2.3.1
./bin/seatunnel.sh --config ./job/dim_basicdate_mysql_2hive.conf-e local

3 欢迎讨论

邮箱1104566414@qq.com


http://www.ppmy.cn/news/80339.html

相关文章

【分立元件】MOSFET如何用于同步整流

在电力电子中我们会使用二极管做开关,当二极管导时,相当于开关闭合,当二极管截止时,相当于开关断开。但是二极管在导通时的管压降在低压电源电路中是一个损耗来源,所以一般我们首选使用的是肖特基二极管,因为肖特基二极管的管压降比较低。 如下所示为非同步BUCK电源拓朴…

Java中的反射以及使用方法

一. 简介 在程序运行阶段, 动态获取一个类中的属性或者方法, 把这种机制成为反射机制. 可以说, 没有反射就没有Java的任何框架 二. 应用 产生对象 假设有一个Student对象 public class Student {private String name;public int age;static int nationality;public Studen…

使用MinIO文件存储系统【完成视频断点续传】业务逻辑

目录 视频上传 接口一&#xff1a;检查该视频/媒资文件是否已经上传完成 接口二&#xff1a;检查视频分块是否已经在minio中已经存在 接口三&#xff1a;上传分块文件到minio中&#xff08;已经上传的分块会在接口二进行校验&#xff09; 接口四&#xff1a;合并上传的分块…

天猫订单之数据分析与挖掘——分类分析

天猫订单之数据分析与挖掘——分类分析 文章目录 天猫订单之数据分析与挖掘——分类分析0. 写在前面1. 分类分析1.1 决策树预测1.2 随机森林1.3 朴素贝叶斯算法0. 写在前面 Windows:Windows10Python:Python3.9本次案例项目主要是采用Pandas和Numpy对天猫订单数据集进行处理、…

linux共享内存总结

共享内存函数由shmget、shmat、shmdt、shmctl四个函数组成 头文件&#xff1a; #include <sys/ipc..h> #include<sys/shm.h> // 创建或获取一个共享内存: 成功返回共享内存ID&#xff0c;失败返回-1 int shmget (key_t key, size_t_size, int flag); // 连接共享内…

图片翻译怎么弄?如何把图片翻译成中文?

在使用社交媒体时&#xff0c;可能会遇到来自世界各地的异文化信息&#xff0c;这时我们可以借助图片翻译的方法帮助我们更好地了解这些信息&#xff0c;促进跨文化交流。那么图片翻译怎么弄呢&#xff1f;图片翻译的方法有哪些呢&#xff1f;这篇文章给你推荐三个非常好用的图…

ROS学习(5)——话题消息与服务

节点之间的消息通信分为几种形式&#xff1a; 话题(topic):单向消息发送/接收方式服务(service):双向消息请求/响应方式动作(action):双向消息目标(goal)/结果(result)/反馈(feedback)方式参数服务器(参数共享模式) 种类区别话题异步单向连续单向的发送/接收数据的情况服务同步…