2.阿里云flinkselectdb-jar作业

news/2024/12/28 14:05:03/

1.概述

本文继续介绍使用阿里云实时计算flink把数据从自建mysql同步到阿里云selectdb的过程。上一节使用sql作业,不够强大,有如下问题:

  • 不支持自动创建结果表(selectdb表)。同步前需要手动在selectdb创建结果表;
  • 不支持源表(mysql表)的ddl语句。源表增加/修改字段,需要先手动在结果表(selectdb表)执行,然后重启sql作业;
  • 不支持添加新的源表。添加新表源表需要重新从全量同步阶段开始运行(flink cdc作业分为全量同步和增量同步两个阶段);
  • 不支持连接复用。sql作业里面的每个insert语句都需要一个源表(mysql表)的连接,当同步的源表比较多时,会占用大量的数据库连接;

本节使用jar作业,通过写代码的方式,解决sql作业存在的问题;

2.目标

把自建mysql的约100张表准实时同步到云服务selectdb。数据量不大,约5个G左右;

源表flink结果表
自建mysql实时计算flink云服务selectdb

3.步骤(重点)

对问题和过程没兴趣的同学,可以直接看这里。本章节记录了阿里云flink与selectdb集成时,使用jar作业的实现方式;

3.1.创建作业

  • JAR作业开发需要使用JDK 1.8版本;
  • JAR作业需要线下完成开发,然后打成jar包,上传到在Flink全托管控制台上部署并运行;
  • JAR作业不支持在Main函数中读取本地配置,读取配置文件需要可通过以下方式;
    • 部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下。配置文件以作业附加依赖文件上传,然后通过代码读取;
    • 上传到其它可访问地址,通过网络读取(注意Flink版默认不能访问公网,需要额外操作开通);
  • JAR作业依赖的其它jar包,可通过直接打进JAR作业的方式 ,也可以通过部署作业时添加附加依赖文件的方式;
@Slf4j
public class CdcMysqlToDorisStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String database = "test";Map<String, String> mysqlConfig = new HashMap<>();mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), "bpms");mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), "127.0.0.1");mysqlConfig.put(MySqlSourceOptions.PORT.key(), "3306");mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "test");mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "test");mysqlConfig.put("jdbc.properties.use_ssl", "false");mysqlConfig.put("sink.properties.format", "json");//**支持在作业运行到增量同步阶段后,动态添加新的源表mysqlConfig.put(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");Configuration config = Configuration.fromMap(mysqlConfig);Map<String, String> sinkConfig = new HashMap<>();sinkConfig.put(DorisConfigOptions.FENODES.key(), "test.selectdbfe.rds.aliyuncs.com:8080");sinkConfig.put(DorisConfigOptions.USERNAME.key(), "test");sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "test");sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://test.selectdbfe.rds.aliyuncs.com:9030");sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());sinkConfig.put("sink.enable-delete", "false");Configuration sinkConf = Configuration.fromMap(sinkConfig);Map<String, String> tableConfig = new HashMap<>();tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, ".*:1");String includingTables = getTables();String excludingTables = "";boolean ignoreDefaultValue = false;boolean useNewSchemaChange = true;String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();boolean singleSink = false;boolean ignoreIncompatible = false;DatabaseSync databaseSync = new MysqlDatabaseSync();databaseSync.setEnv(env).setDatabase(database).setConfig(config).setIncludingTables(includingTables).setExcludingTables(excludingTables).setIgnoreDefaultValue(ignoreDefaultValue).setSinkConfig(sinkConf).setTableConfig(new DorisTableConfig(tableConfig)).setCreateTableOnly(false).setNewSchemaChange(useNewSchemaChange).setSchemaChangeMode(schemaChangeMode).setSingleSink(singleSink).setIgnoreIncompatible(ignoreIncompatible).create();databaseSync.build();env.execute(String.format("mysql-doris数据库同步,database=%s", database));}//**读取配置文件里面,获取需要同步的表@SneakyThrowsprivate static String getTables() {String rst;//**Flink JAR作业不支持在Main函数中读取本地配置//**在作业运行时,部署作业所添加附加依赖文件将会加载到作业所运行Pod的/flink/usrlib目录下try (Stream<String> stream = Files.lines(Paths.get("/flink/usrlib/mysql-to-doris-tables"))) {rst = stream.map(String::trim).filter(StringUtils::isNotBlank).collect(joining("|"));}log.info("读取同步的表成功,tables={}", rst);Assert.notBlank(rst, "同步的表不能为空");return rst;}
}

3.2.部署作业

<a class=阿里云flink-jar作业部署" />

4.遇到的问题

flink_108">全托管flink怎么获取上传的文件;

我们使用的是阿里云全托管flink,文件都上传到了阿里云管理的oss。查询文档发现地址如下:

oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/文件名

如果还是不知道是多少,可以先创建一个jar作业,然后在作业的基础信配置->JAR Uri里面查看;

运行报错java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;

  • 原因: 编译的jdk版本和运行的jdk版本不一致。jdk8和jdk11的此方法不兼容;
  • 解决: 查看打包编译的jdk版本,需使用jdk8;

http://www.ppmy.cn/news/1558808.html

相关文章

CI/CD是什么?

CI/CD 定义 CI/CD 代表持续集成和持续部署&#xff08;或持续交付&#xff09;。它是一套实践和工具&#xff0c;旨在通过自动化构建、测试和部署来改进软件开发流程&#xff0c;使您能够更快、更可靠地交付代码更改。 持续集成 (CI)&#xff1a;在共享存储库中自动构建、测试…

面试经典题目:LeetCode134_加油站

leetcode134_加油站 暴力解法贪心算法初始条件第一个不等式第二个不等式考虑任意加油站 z z z推导过程结论 题目链接&#xff1a;leetcode134_加油站 暴力解法 暴力解法会尝试每个加油站作为起点&#xff0c;模拟一圈行驶来验证是否能成功环绕一周。这种方法的时间复杂度是 O…

微信小程序 不同角色进入不同页面、呈现不同底部导航栏

遇到这个需求之前一直使用的小程序默认底部导航栏&#xff0c;且小程序默认入口页面为pages/index/index&#xff0c;要使不同角色呈现不同底部导航栏&#xff0c;必须要在不同页面引用不同的自定义导航栏。本篇将结合分包&#xff08;subPackages&#xff09;展开以下三步叙述…

Android Studio Gradle Sync timeout

使用Android Studio开发Android应用程序时&#xff0c;Gradle sync timeout常用的处理方法有&#xff1a; 增加超时设置&#xff1a;在 gradle.properties 文件中增加超时设置&#xff0c;给 Gradle 更长时间完成同步使用国内镜像&#xff1a;通过配置阿里云、华为或腾讯云的 …

【Python运维】构建基于Python的自动化运维平台:用Flask和Celery

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 在现代IT运维中,自动化运维平台扮演着至关重要的角色,它能够显著提高运维效率,减少人为错误,并且增强系统的可维护性。本文将引导读者如…

阿里云人工智能ACA(五)——深度学习基础

一、深度学习概述 1. 深度学习概念 1-1. 深度学习基本概念 深度学习是机器学习的一个分支基于人工神经网络&#xff08;模仿人脑结构&#xff09;通过多层网络自动学习特征能够处理复杂的模式识别问题 1-2. 深度学习的优点与缺点 优点 强大的特征学习能力可以处理复杂问题…

华为战略解码-162页 八大章节 精读

该文档主要解读了华为战略解码的过程和内容&#xff0c;强调了领导力在战略管理中的重要性&#xff0c;介绍了华为战略管理的七个关键点以及领导力的七个特质。文档详细阐述了华为在战略解码过程中如何利用BLM模型等工具&#xff0c;以及如何从市场洞察、业务设计等方面制定和执…

2024年OpenTiny年度人气贡献者评选正式开始

携手共创&#xff0c;致敬不凡&#xff01; 2024年&#xff0c;OpenTiny持续在前端开源领域扎根&#xff0c;每一位开发者都是推动项目共同前行的宝贵力量。从bug修复&#xff0c;到技术探讨&#xff1b;从参与开源活动&#xff0c;到输出技术文章&#xff1b;从使用项目&…