flink cdc 应用

devtools/2024/11/17 0:51:24/

SQLServer

1. The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.

遇到了一下问题,多次尝试,最终发现是数据库大小写要一致。

Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at io.debezium.schema.HistorizedDatabaseSchema.recover(HistorizedDatabaseSchema.java:38) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext.validateAndLoadDatabaseHistory(SqlServerSourceFetchTaskContext.java:187) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext.configure(SqlServerSourceFetchTaskContext.java:130) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-sql-connector-sqlserver-cdc-3.2.0.jar:3.2.0]at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.20.0.jar:1.20.0]at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.20.0.jar:1.20.0]at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-files-1.20.0.jar:1.20.0]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
  CREATE TABLE Member_Extend (ID INT, MemberID INT,  PRIMARY KEY (ID) NOT ENFORCED) WITH ('connector' = 'sqlserver-cdc','hostname' = '192.168.1.3','port' = '1433','username' = 'test','password' = 'test','database-name' = 'CrmExtend','table-name' = 'dbo.Member_Extend');

作业安全启停


show jobs
Flink SQL> show jobs;
+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
|                           job id |                                                         job name |   status |              start time |
+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
| ce5a0e938563cf52317c5b9055ad102f| testjob |  RUNNING | 2024-11-15T03:38:47.919 |+----------------------------------+------------------------------------------------------------------+----------+-------------------------+
4 rows in setSET state.checkpoints.dir='s3://flink/cdc-1.20/savepoints';
stop job 'ce5a0e938563cf52317c5b9055ad102f' with savepoint;Flink SQL> stop job 'ce5a0e938563cf52317c5b9055ad102f' with savepoint;
+--------------------------------------------------------------+
|                                               savepoint path |
+--------------------------------------------------------------+
| s3://flink/cdc-1.20/savepoints/savepoint-ce5a0e-2935055bb307 |
+--------------------------------------------------------------+
1 row in setSET execution.savepoint.path='s3://flink/cdc-1.20/savepoints/savepoiznt-ce5a0e-2935055bb307';  
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 重新执行原有sqlinsert into flink_user select * from user ;

http://www.ppmy.cn/devtools/134578.html

相关文章

24/11/14 算法笔记 EM算法期望最大化算法

EM算法用于含有隐变量的概率参数模型的最大似然估计或极大后验概率估计。它在机器学习和统计学中有着广泛的应用,比如在高斯混合模型(GMM)、隐马尔可夫模型(HMM)以及各种聚类和分类问题中。 EM算法的基本思想是&#…

如何利用静态住宅IP提升TikTok营销效果:应对平台限制与账号安全的新利器

随着TikTok的全球化和日益严苛的运营政策,越来越多的品牌和商家开始面临着平台地域限制、账户管理及内容推广的挑战。特别是在多个账户管理和跨境营销中,如何避免账号封禁、提高内容曝光,成为了亟待解决的问题。在这种背景下,代理…

STM32 ADC --- DMA乒乓缓存

STM32 ADC — DMA乒乓缓存 文章目录 STM32 ADC --- DMA乒乓缓存软件切换实现乒乓利用DMA双缓冲实现乒乓 通过cubeMX配置生成HAL工程这里使用的是上篇文章(STM32 ADC — DMA采样)中生成的工程配置 软件切换实现乒乓 cubeMX默认生成的工程中是打开DMA中断…

【python系列】python数据类型之数字类型

1.定义 数字类型是编程中最常用的数据类型。什么是数字类型,下面是数字类型官方文档的解释:https://docs.python.org/zh-cn/3.10/library/stdtypes.html?highlightstr%20join#numeric-types-int-float-complex 以上可以知道: 数字类型包…

回调函数的概念、意义和应用场景

概念 回调函数,就是使用者自己定义一个函数,并实现函数的内容,然后把这个函数作为参数传入其它函数中,由其它函数在运行时来调用。 换句话说,函数是你实现的,但由别人的函数在运行时通过参数传递的方式调用…

flutter下拉刷新上拉加载的简单实现方式三

使用 CustomScrollView 结合 SliverList 实现了一个支持下拉刷新和上拉加载更多功能的滚动列表,对下面代码进行解析学习。 import dart:math;import package:flutter/material.dart;import custom_pull/gsy_refresh_sliver.dart; import package:flutter/cupertino…

hhdb数据库介绍(9-14)

SQL语法支持 DML语句 在关系集群数据库中,DML语句的逻辑将变的更为复杂。计算节点将DML语句分为两大类:单库DML语句与跨库DML语句。 单库DML语句,指SQL语句只需在一个节点上运行,即可计算出正确结果。假设分片表customer分片字…

杨中科 .Net Core 笔记 DI 依赖注入2

ServiceCollection services new ServiceCollection();//定义一个承放服务的集合 services.AddScoped<iGetRole, GetRole>();using (ServiceProvider serviceProvider services.BuildServiceProvider()) {var list serviceProvider.GetServices(typeof(iGetRole));//获…