24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2

news/2024/12/5 5:58:57/

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2

26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)

29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 五、Catalog API
    • 1、数据库操作
      • 1)、jdbccatalog示例
      • 2)、hivecatalog示例-查询指定数据库下的表名称
      • 3)、hivecatalog示例-创建database
    • 2、表操作


本文简单介绍了通过java api操作数据库、表,分别提供了具体可运行的例子。
本文依赖flink和hive、hadoop集群能正常使用。
本文分为2个部分,即数据库操作、表操作。
本文示例java api的实现是通过Flink 1.13.5版本做的示例,SQL 如果没有特别说明则是Flink 1.17版本。

五、Catalog API

1、数据库操作

下文列出了一般的数据库操作,示例是以jdbccatalog为示例,flink的版本是1.17.0。


// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);// drop database
catalog.dropDatabase("mydb", false);// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);// get databse
catalog.getDatabase("mydb");// check if a database exist
catalog.databaseExists("mydb");// list databases in a catalog
catalog.listDatabases("mycatalog");

1)、jdbccatalog示例

  • pom.xml
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>
  • java
import java.util.List;import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;/*** @author alanchan**/
public class TestJdbcCatalogDemo {/*** @param args* @throws DatabaseNotExistException* @throws CatalogException*/public static void main(String[] args) throws CatalogException, DatabaseNotExistException {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// public JdbcCatalog(// String catalogName,// String defaultDatabase,// String username,// String pwd,// String baseUrl)// CREATE CATALOG alan_catalog WITH(// 'type' = 'jdbc',// 'default-database' = 'test?useSSL=false',// 'username' = 'root',// 'password' = 'root',// 'base-url' = 'jdbc:mysql://192.168.10.44:3306'// );Catalog catalog = new JdbcCatalog("alan_catalog", "test?useSSL=false", "root", "123456", "jdbc:mysql://192.168.10.44:3306");// Register the catalogtenv.registerCatalog("alan_catalog", catalog);List<String> tables = catalog.listTables("test"); 
//		System.out.println("test tables:" + tablesfor (String table : tables) {System.out.println("Database:test  tables:"+table);}}}
  • 运行结果
Database:test  tables:allowinsert
Database:test  tables:author
Database:test  tables:batch_job_execution
Database:test  tables:batch_job_execution_context
Database:test  tables:batch_job_execution_params
Database:test  tables:batch_job_execution_seq
Database:test  tables:batch_job_instance
Database:test  tables:batch_job_seq
Database:test  tables:batch_step_execution
Database:test  tables:batch_step_execution_context
Database:test  tables:batch_step_execution_seq
Database:test  tables:book
Database:test  tables:customertest
Database:test  tables:datax_user
Database:test  tables:dm_sales
Database:test  tables:dms_attach_t
Database:test  tables:dx_user
Database:test  tables:dx_user_copy
Database:test  tables:employee
Database:test  tables:hibernate_sequence
Database:test  tables:permissions
Database:test  tables:person
Database:test  tables:personinfo
Database:test  tables:role
Database:test  tables:studenttotalscore
Database:test  tables:t_consume
Database:test  tables:t_czmx_n
Database:test  tables:t_kafka_flink_user
Database:test  tables:t_merchants
Database:test  tables:t_recharge
Database:test  tables:t_user
Database:test  tables:t_withdrawal
Database:test  tables:updateonly
Database:test  tables:user

2)、hivecatalog示例-查询指定数据库下的表名称

本示例需要在有hadoop和hive环境执行,本示例是打包执行jar文件。
关于flink与hive的集成请参考:42、Flink 的table api与sql之Hive Catalog

  • pom.xml
	<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><!-- flink执行计划,这是1.9版本之前的 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- blink执行计划,1.11+默认的 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- flink连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version><!--<version>8.0.20</version> --></dependency><!-- 高性能异步组件:Vertx --><dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>3.9.0</version></dependency><dependency><groupId>io.vertx</groupId><artifactId>vertx-jdbc-client</artifactId><version>3.9.0</version></dependency><dependency><groupId>io.vertx</groupId><artifactId>vertx-redis-client</artifactId><version>3.9.0</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.44</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding> --></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 设置jar包的入口类(可选) --><mainClass> org.table_sql.TestHiveCatalogDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
  • java
import java.util.List;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException * @throws CatalogException */public static void main(String[] args) throws CatalogException, DatabaseNotExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase); // tables should contain "test"
//		System.out.println("test tables:" + tablesfor (String table : tables) {System.out.println("Database:testhive  tables:" + table);}}}
  • 运行结果
################hive查询结果##################
0: jdbc:hive2://server4:10000> use testhive;
No rows affected (0.021 seconds)
0: jdbc:hive2://server4:10000> show tables;
+-----------------------+
|       tab_name        |
+-----------------------+
| apachelog             |
| col2row1              |
| col2row2              |
| cookie_info           |
| dual                  |
| dw_zipper             |
| emp                   |
| employee              |
| employee_address      |
| employee_connection   |
| ods_zipper_update     |
| row2col1              |
| row2col2              |
| singer                |
| singer2               |
| student               |
| student_dept          |
| student_from_insert   |
| student_hdfs          |
| student_hdfs_p        |
| student_info          |
| student_local         |
| student_partition     |
| t_all_hero_part_msck  |
| t_usa_covid19         |
| t_usa_covid19_p       |
| tab1                  |
| tb_dept01             |
| tb_dept_bucket        |
| tb_emp                |
| tb_emp01              |
| tb_emp_bucket         |
| tb_json_test1         |
| tb_json_test2         |
| tb_login              |
| tb_login_tmp          |
| tb_money              |
| tb_money_mtn          |
| tb_url                |
| the_nba_championship  |
| tmp_1                 |
| tmp_zipper            |
| user_dept             |
| user_dept_sex         |
| users                 |
| users_bucket_sort     |
| website_pv_info       |
| website_url_info      |
+-----------------------+
48 rows selected (0.027 seconds)################flink查询结果##################
[alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
Database:testhive  tables:student
Database:testhive  tables:user_dept
Database:testhive  tables:user_dept_sex
Database:testhive  tables:t_all_hero_part_msck
Database:testhive  tables:student_local
Database:testhive  tables:student_hdfs
Database:testhive  tables:student_hdfs_p
Database:testhive  tables:tab1
Database:testhive  tables:student_from_insert
Database:testhive  tables:student_info
Database:testhive  tables:student_dept
Database:testhive  tables:student_partition
Database:testhive  tables:emp
Database:testhive  tables:t_usa_covid19
Database:testhive  tables:t_usa_covid19_p
Database:testhive  tables:employee
Database:testhive  tables:employee_address
Database:testhive  tables:employee_connection
Database:testhive  tables:dual
Database:testhive  tables:the_nba_championship
Database:testhive  tables:tmp_1
Database:testhive  tables:cookie_info
Database:testhive  tables:website_pv_info
Database:testhive  tables:website_url_info
Database:testhive  tables:users
Database:testhive  tables:users_bucket_sort
Database:testhive  tables:singer
Database:testhive  tables:apachelog
Database:testhive  tables:singer2
Database:testhive  tables:tb_url
Database:testhive  tables:row2col1
Database:testhive  tables:row2col2
Database:testhive  tables:col2row1
Database:testhive  tables:col2row2
Database:testhive  tables:tb_json_test1
Database:testhive  tables:tb_json_test2
Database:testhive  tables:tb_login
Database:testhive  tables:tb_login_tmp
Database:testhive  tables:tb_money
Database:testhive  tables:tb_money_mtn
Database:testhive  tables:tb_emp
Database:testhive  tables:dw_zipper
Database:testhive  tables:ods_zipper_update
Database:testhive  tables:tmp_zipper
Database:testhive  tables:tb_emp01
Database:testhive  tables:tb_emp_bucket
Database:testhive  tables:tb_dept01
Database:testhive  tables:tb_dept_bucket

3)、hivecatalog示例-创建database

本示例着重在于演示如何创建database,其如何构造函数来创建database。

  • pom.xml
    参考示例2
  • java
import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException* @throws CatalogException* @throws DatabaseAlreadyExistException*/public static void main(String[] args) throws CatalogException, DatabaseNotExistException, DatabaseAlreadyExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase);for (String table : tables) {System.out.println("Database:testhive  tables:" + table);}//	    public CatalogDatabaseImpl(Map<String, String> properties, @Nullable String comment) {
//	        this.properties = checkNotNull(properties, "properties cannot be null");
//	        this.comment = comment;
//	    }Map<String, String> properties = new HashMap();CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");String newDatabaseName = "alan_hivecatalog_hivedb";hiveCatalog.createDatabase(newDatabaseName, cd, true);List<String> newTables = hiveCatalog.listTables(newDatabaseName);for (String table : newTables) {System.out.println("Database:alan_hivecatalog_hivedb  tables:" + table);}}}
  • 运行结果
##################  hive查询结果  ############################
#####提交flink创建database前查询结果
0: jdbc:hive2://server4:10000> show databases;
+----------------+
| database_name  |
+----------------+
| default        |
| test           |
| testhive       |
+----------------+
3 rows selected (0.03 seconds)
#####提交flink创建database后查询结果
0: jdbc:hive2://server4:10000> show databases;
+--------------------------+
|      database_name       |
+--------------------------+
| alan_hivecatalog_hivedb  |
| default                  |
| test                     |
| testhive                 |
+--------------------------+
4 rows selected (0.023 seconds)##################  flink 查询结果  ############################
#### 由于只创建了database,其下是没有表的,故没有输出。至于testhive库下的表输出详见示例2,不再赘述。

2、表操作

表操作就是指hivecatalog的操作,因为jdbccatalog不能对库、表进行操作,当然查询类是可以的。故以下示例都是以hivecatalog进行说明。本处与24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1的第三部分相似,具体参考其示例即可。不再赘述。

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");// get table
catalog.getTable("mytable");// check if a table exist or not
catalog.tableExists("mytable");// list tables in a database
catalog.listTables("mydb");

本文简单介绍了通过java api操作数据库、表,分别提供了具体可运行的例子。


http://www.ppmy.cn/news/1157387.html

相关文章

incStrong() 和 decStrong()

在 Android JNI 层代码中&#xff0c;incStrong() 和 decStrong() 是在使用跨越 JNI 边界的对象时常见的方法。它们用于在本地代码中增加和减少对象的引用计数。这些方法通常用于管理对象的生命周期&#xff0c;以确保在不再需要对象时能够正确释放资源。 在 Android 中&#…

图像边缘检测--(Sobel、Laplacian、Canny)

1、图像中各种形状的检测时计算机视觉领域中非常常见的技术之一,特别是图像中直线的检测,圆的检测,图像边缘的检测等,下面将介绍如何快速检测图像边缘。 2、边缘是不同区域的分界线,是周围(局部)像素有显著变化的像素的集合,有幅值与方向两个属性。这个不是绝对的定义,…

IDEA的使用(四)创建不同类型的工程(IntelliJ IDEA 2022.1.3版本)

1. 创建Java工程 创建之后&#xff0c;src下是空的。可以在src下创建软件包Package&#xff0c;命名采用域名倒序。在软件包下再创建Java类。Java类运行后出现中文乱码&#xff0c;就到控制台和文件编码这两个地方设置编码。 2. 创建JavaWeb工程 2.1 在win11和IDEA中配置Tomca…

C# RestoreFormer 图像修复

效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.Collections.Generic; using System.Drawing; using System.Drawing.Imaging; using System.Windows.Forms;namespace 图像修复 {pu…

LeetCode【46】全排列

题目&#xff1a; 代码&#xff1a; 在这里插入代码片

Unity中Shader的深度写入ZWrite

文章目录 前言一、更新深度缓冲区中值二、深度值的写入操作只有两个选择 开启 和 关闭ZWrite OnZWrite Off 三、深度写入在半透明物体物体中开启的情况1、特效一般都需要关闭深度写入2、如果在人物模型上使用 特效半透明 的 Shader&#xff0c;为了不出现模型自身穿透问题&…

Python 读取.tsv文件

TSV&#xff08;Tab Separated Values&#xff09;和CSV&#xff08;Comma Separated Values&#xff09;都是常见的文本文件格式&#xff0c;用于存储表格数据。它们之间的主要区别在于字段之间的分隔符。 CSV&#xff08;逗号分隔值&#xff09;: 使用逗号作为字段之间的分隔…

xml的语法

<!-- 1、每一个xml,有且只有一个根标签&#xff0c;所有xml标签必须写在根标签中 2、标签必须要有合闭 3、xml格式是否正确&#xff0c;可以通过浏览器打开xml。来校验xml格式是否正确 4、xml是区别大小写的 5、xml书写标签名时&#xff0c;不要出现空格等特…