执行flink sql连接clickhouse库

devtools/2024/11/20 5:04:37/

手把手教学,flink connector打通clickhouse数据库,通过下发flink sql,来使用ck。

组件版本
jdk1.8
flink1.17.2
clickhouse23.12.2.59

1.背景

flink官方不支持clickhouse连接器,工作中难免会用到。

2.方案

利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16

3.编译

导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar

4.将此依赖包,导入flink工程

spring boot工程

4.1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion>
<!--	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.13</version><relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;</parent>--><parent><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid</artifactId><version>${project.build.version}</version></parent><artifactId>mit-microgrid-flink</artifactId><name>mit-microgrid-flink</name><description>flink connector clickhouse</description><properties><java.version>1.8</java.version><flink.version>1.17.2</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><!-- 排除SpringBoot自带的日志依赖 --><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!--flink--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><!--flink connector--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!--flink connector clickhouse--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.16.0-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId>
<!--			<artifactId>flink-clients_2.12</artifactId>--><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><!-- flink sql --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!-- Flink JDBC Connector -->
<!--		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.6</version> &lt;!&ndash; 与您的Flink版本匹配 &ndash;&gt;</dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><!-- ClickHouse JDBC Driver --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 --></dependency><!-- 添加clickhouse-maven依赖--><dependency><groupId>ru.ivi.opensource</groupId><artifactId>flink-clickhouse-sink</artifactId><version>1.2.0</version></dependency><!--module--><dependency><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid-common-core</artifactId><version>${project.build.version}</version></dependency><dependency><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid-api-history</artifactId><version>${project.build.version}</version></dependency><!--sql parse--><dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.37.0</version></dependency>
<!--		<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-server</artifactId><version>1.37.0</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-parser</artifactId><version>${flink.version}</version></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-text</artifactId><version>1.12.0</version></dependency></dependencies><build><!--<plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin>-->
<!--			<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin>-->
<!--		</plugins>--><finalName>${project.artifactId}</finalName><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.7.3</version><configuration><mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass><fork>true</fork><layout>ZIP</layout><includeSystemScope>true</includeSystemScope></configuration><executions><execution><goals><goal>repackage</goal></goals><configuration><classifier>-with-dependencies</classifier></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.3.0</version><configuration><archive><addMavenDescriptor>false</addMavenDescriptor><manifest><mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><descriptors><descriptor>src/main/resources/assembly/assembly.xml</descriptor></descriptors><outputDirectory>./../out</outputDirectory></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

4.2)核心方法:

    /*** multiple sql execute** @param sqlList*/public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {log.info("参数sqlList: {}", sqlList);
//        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);EnvironmentSettings setting = EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(setting);if (CollectionUtil.isNullOrEmpty(sqlList)) {log.warn("sqlList参数为空");return null;}for (String s : sqlList) {TableResult tableResult = tEnv.executeSql(s);Optional<JobClient> jobClientOptional = tableResult.getJobClient();if (jobClientOptional.isPresent()) {JobClient jobClient = jobClientOptional.get();log.info("jobClient: " + jobClient);return jobClient;}}log.error("没有可执行的job");return null;}

5.源码地址

https://github.com/genghongsheng0/mit-microgrid-flink


http://www.ppmy.cn/devtools/135391.html

相关文章

Vue实现消息提示功能

1.首先要先定义消息提示的组件&#xff0c;在这个组件中需要实现自动关闭的功能&#xff08;看自己的爱好呗&#xff09;&#xff0c;并且设置自己喜欢的样式&#xff0c;vue中还有可以自定义进场和退场动画的样式&#xff08;就是那个v-enter-active和v-leave-active&#xff…

python核心语法

目录 核⼼语法第⼀节 变量0.变量名规则1.下⾯这些都是不合法的变量名2.关键字3.变量赋值4.变量的销毁 第⼆节 数据类型0.数值1.字符串2.布尔值(boolean, bool)3.空值 None 核⼼语法 第⼀节 变量 变量的定义变量就是可变的量&#xff0c;对于⼀些有可能会经常变化的数据&#…

【Linux】多线程(中)

目录 一、线程互斥 1.1 互斥概念 1.2 互斥量mutex 1.3 互斥量相关API &#xff08;1&#xff09;初始化互斥量 &#xff08;2&#xff09;销毁互斥量 &#xff08;3&#xff09;互斥量加锁和解锁 1.4 互斥量原理 1.5 重入和线程安全 二、死锁 2.1 概念 2.2 造成死锁…

SQL,力扣题目1126,查询活跃业务

一、力扣链接 LeetCode_1126 二、题目描述 事件表&#xff1a;Events ------------------------ | Column Name | Type | ------------------------ | business_id | int | | event_type | varchar | | occurrences | int | ------------------------…

力扣 简单 70.爬楼梯

文章目录 题目介绍题解 题目介绍 题解 思路分析&#xff1a; 确定dp数组以及下标的含义&#xff1a;dp[i]&#xff1a; 爬到第i层楼梯&#xff0c;有dp[i]种方法确定递推公式&#xff1a;从dp[i]的定义可以看出&#xff0c;dp[i] 可以有两个方向推出来。首先是dp[i - 1]&…

基于YOLOv8深度学习的婴儿情绪状态检测系统(PyQt5界面+数据集+训练代码)

婴儿的情绪状态是其表达健康状况、情感需求以及与外界互动的重要方式&#xff0c;准确识别婴儿的情绪对父母和看护者理解其需求具有关键意义。然而&#xff0c;由于婴儿语言能力的缺乏&#xff0c;他们通常通过面部表情、动作和哭声等非语言行为来表达情绪&#xff0c;因此需要…

学习笔记019——Ubuntu部署tomcat

1、下载Tomcat压缩包。本人下载的版本是&#xff1a;apache-tomcat-8.5.77.tar.gz 2、将压缩包上传到Ubuntu某个目录。 本人存放的目录是 /opt 目录下, 命令解压&#xff1a; ## 解压tomcat压缩包 tar -zxvf apache-tomcat-8.5.77.tar.gz 3、vim打开bin目录的setclasspath…

ARM64环境部署EFK8.15.3收集K8S集群容器日志

环境规划 主机IP系统部署方式ES版本CPU架构用户名密码192.168.1.225Ubuntu 22.04.4 LTSdockerelasticsearch:8.15.3ARM64elasticllodyi4TMmZD ES集群部署 创建持久化目录(所有节点) mkdir -p /data/es/{data,certs,logs,plugins} mkdir -p /data/es/certs/{ca,es01}服务器…