Dinky FlinkSQL Doris读取写入

server/2024/12/22 2:09:45/

Dinky运行前开启全局变量,以支持使用:  'sink.sink.label-prefix' = '${idUtil.simpleUUID()}'

Mysql同步Doris - testMysqlCdcDoris:

EXECUTE CDCSOURCE demo_doris WITH ('connector' = 'mysql-cdc','hostname' = '172.xxx','port' = '3306','username' = 'xxx','password' = 'xxx','checkpoint' = '10000','scan.startup.mode' = 'initial','parallelism' = '1','database-name' = 'test','table-name' = 'test\.student,','sink.connector' = 'doris','sink.fenodes' = '172.xxx:8130','sink.username' = 'xxx','sink.password' = 'xxx','sink.doris.batch.size' = '1000','sink.sink.max-retries' = '1','sink.sink.db' = 'test','sink.sink.enable-delete' = 'true','sink.sink.properties.format' ='json','sink.sink.properties.read_json_by_line' ='true','sink.table.prefix' = 'test_','sink.table.identifier' = '#{schemaName}.#{tableName}','sink.sink.label-prefix' = '${idUtil.simpleUUID()}'
);

读取Doris - testDorisRead:

CREATE TABLE flink_doris_source (aggregate_id int,replace_data string,max_data string,agg_item int,max_item int,min_item int
) 
WITH ('connector' = 'doris','fenodes' = '172.xxx:8130','table.identifier' = 'test.aggregate_table','username' = 'xxx','password' = 'xxx'
);select * from flink_doris_source

Doris同步Doris - testDorisCdcDoris:

-- doris source
CREATE TABLE flink_doris_source (aggregate_id int,replace_data string,max_data string,agg_item int,max_item int,min_item int
) 
WITH ('connector' = 'doris','fenodes' = '172.xxx:8130','table.identifier' = 'test.aggregate_table','username' = 'xxx','password' = 'xxx'
);-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';-- doris sink
CREATE TABLE flink_doris_sink (aggregate_id int,replace_data string,max_data string,agg_item int,max_item int,min_item int) WITH ('connector' = 'doris','fenodes' = '172.xxx:8030','table.identifier' = 'test.test_aggregate_table','username' = 'xxx','password' = 'xxx','sink.label-prefix' = '${idUtil.simpleUUID()}'
);-- submit insert job
INSERT INTO flink_doris_sink select aggregate_id, replace_data, max_data, agg_item, max_item, min_item from flink_doris_source

参考

Flink Doris Connector - Apache Doris

Doris + Flink + DolphinScheduler + Dinky 构建开源数据平台_dinky dolphinscheduler flink-CSDN博客

整库同步概述 | Dinky


http://www.ppmy.cn/server/47447.html

相关文章

Java Keyword

文章目录 Java Keyword一、基本数据类型相关关键字(8个)(1)byte:单字节类型(2)short:短整型(3)int:整型(4)long:长整型(5&…

MAX7219(模拟SPI)驱动灯环的简单应用

文章目录 一、MAX7219是什么?二、使用步骤1.硬件1.1 引脚说明1.2 应用电路1.2.1 驱动数码管1.2.2 驱动点阵 2.软件2.1 时序2.2 寄存器2.2.1 掉电寄存器2.2.2 译码模式寄存器2.2.3 亮度寄存器2.2.4 扫描寄存器2.2.5 显示测试寄存器 2.3 初始化2.4 控制左侧灯环特定位…

Go 语言中基础数据类型、运算符、类型转换与类型别名

在编程语言中,数据类型是程序设计的基石,它们决定了变量的存储方式、允许的操作以及运算结果。在 Go 语言中,数据类型丰富而灵活,提供了强大的工具来处理各种数据和运算需求。无论是简单的布尔值、整型和浮点型,还是复…

【JAVA架构】开发在线开具电子发票系统

【JAVA架构VUE】开发在线开具电子发票系统 对接税务厂家接口 实现销售发票开具 进项发票在线拉取 红冲发票在线开具 详细内容可以关注本人专栏等 销售发票开具 开具发票 进项发票在线拉取 红冲发票在线开具

Flutter Android 热修复方案(3.22.0)

本文基于Flutter 3.22.0 实现 Flutter 在 Android 的Release编译产物为 libapp.so,我们只需要把它换成我们要修复的so即可。 实现方案有两种: 1. 重写以下代码 FlutterApplication FlutterInjector FlutterLoader FlutterApplicationInfo Application…

Spring Boot详解:深入了解与实践

文章目录 1. Spring Boot简介1.1 什么是Spring Boot?1.2 Spring Boot的历史背景1.3 Spring Boot的核心特点 2. Spring Boot的核心概念2.1 自动配置2.1.1 自动配置原理2.1.2 自定义配置 2.2 Spring Boot Starter2.3 Spring Boot CLI 3. Spring Boot的主要功能模块3.1…

小抄 20240602

1 想要赚到大钱,一定要利用好杠杆。 以前是资本杠杆、资源杠杆,普通人基本接触不到,碰了也是个死。 现在是互联网杠杆、自媒体杠杆,人人都可以免费使用。 不管你做什么,都要尽量用上互联网这个杠杆,撬动…

拥抱ASPICE标准——让软件开发更高效、更安全

随着科技的飞速发展,软件已经渗透到我们生活的方方面面,从智能手机到智能家居,从自动驾驶到云计算,软件已经成为了现代社会不可或缺的一部分。然而,随着软件复杂性的不断提升,如何确保软件的质量、可靠性和…