Paimon Flink本地读取数据报错

server/2024/9/22 23:13:30/

1.idea本地读取paimon

用idea在本地读取paimon的表时需要添加的依赖除了官网提出的和hadoop相关的,paimon-flink之类相关的除外还需要其他额外依赖

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Test {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE CATALOG my_jdbc WITH (\n" +"'type' = 'paimon',\n" +"'metastore' = 'jdbc',\n" +"'uri' = 'jdbc:mysql://test3:3306/paimon',\n" +"'jdbc.user' = 'root',\n" +"'jdbc.password' = 'N^57F9m2RI#rp8',\n" +"'catalog-key'='jdbc',\n" +"'warehouse' = 'hdfs://test1:8020/user/paimon'\n" +"                  )");tableEnv.executeSql("use catalog my_jdbc");tableEnv.sqlQuery("select * from test").execute().print();}
}

以上是我的代码,用的jdbc catalog读取hadoop上的数据,下面是最开始的依赖:

<dependency><groupId>org.apache.paimon</groupId><artifactId>paimon-flink-1.18</artifactId><version>0.8.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.18.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.18.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency>

2. 报错1

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/hybrid/HybridSource$SourceFactory

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/hybrid/HybridSource$SourceFactoryat org.apache.paimon.flink.source.DataTableSource.getScanRuntimeProvider(DataTableSource.java:192)at org.apache.paimon.flink.source.table.BaseTableSource.getScanRuntimeProvider(BaseTableSource.java:42)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3997)at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2867)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2427)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2341)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2286)at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:723)at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:709)at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3843)at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:617)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)at org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)at kafka.Test.main(Test.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.hybrid.HybridSource$SourceFactoryat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 25 more

从报错大概可以看出需要这个 org/apache/flink/connector/base路径底下的source/hybrid/HybridSource$SourceFactory类,查了一下这个类在flink-connector-base类里面,所以添加上这个依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version>
</dependency>

3.报错2

加上了上面的依赖继续跑然后又报错了:NoClassDefFoundError: org/apache/flink/connector/file/src/reader/BulkFormat$RecordIterator

Exception in thread "main" java.lang.RuntimeException: Failed to fetch next resultat org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)at kafka.Test.main(Test.java:25)
Caused by: java.io.IOException: Failed to fetch job execution resultat org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)... 5 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:183)... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)... 7 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/connector/file/src/reader/BulkFormat$RecordIteratorat org.apache.paimon.flink.source.FileStoreSourceReader.<init>(FileStoreSourceReader.java:55)at org.apache.paimon.flink.source.FlinkSource.createReader(FlinkSource.java:59)at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:314)at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:718)at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/connector/file/src/reader/BulkFormat$RecordIterator... 11 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.file.src.reader.BulkFormat$RecordIteratorat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 11 more

从上面大概可以看出应该是缺flink-connector-files依赖,添加上去之后成功了

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.18-SNAPSHOT</version>
</dependency>


http://www.ppmy.cn/server/105058.html

相关文章

【设计模式】模块模式和桥接模式

模块模式 模块化模式最初被定义为在传统软件工程中为类提供私有和公共封装的一种方法。 能够使一个单独的对象拥有公共/私有的方法和变量&#xff0c;从而屏蔽来自全局作用域的特殊部分。这可以减少我们的函数名与在页面中其他脚本区域内定义的函数名冲突的可能性。 闭包&am…

Spring系列之Spring Cache缓存注解的使用

目录 一、概述 二、缓存注解 1、Cacheable 缓存结果 2、CachePut 更新缓存 3、CacheEvict 清除缓存 4、Caching 组合缓存&#xff08;不常用&#xff09; 5、CacheConfig 类级别缓存配置&#xff08;不常用&#xff09; 6、CacheResult 设置缓存超时&#xff08;不常用…

数据采集-->kafka-->hdfs

数据采集到kafka flume: a1.sources r1 a1.channels c1a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/installs/flume1.9/job/a.log a1.sources.r1.positionFile /opt/installs/flume1.9/job/taildir-kafka.jsona1.channe…

【日常总结】阿里云:windows server 过一段时间登录不进去,或提示:出现身份验证错误。 无法连接到本地安全机构

场景 阿里云 : ESC系统&#xff1a;windows server 2022 问题 无法登录&#xff0c;或者登录浸提提示密码已过期 原因 密码设置了过期时间 解决方案 修改密码策略&#xff1a;密码设置永不过期 打开“本地安全策略”编辑器&#xff1a;运行 secpol.msc。 导航至“账户…

【leetcode】 27. 移除元素

题目&#xff1a; 题目链接 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素。元素的顺序可能发生改变。然后返回 nums 中与 val 不同的元素的数量。 假设 nums 中不等于 val 的元素数量为 k&#xff0c;要通过此题&#xff0c;您需要执行…

基于单片机车载酒精浓度的检测系统

摘 要&#xff1a; 为了有效地防止驾驶员酒后驾车的行为&#xff0c;设计了一种基于单片机车载酒精浓度的检测系统 。 该系统由酒精传感器、 A/D 转换器 、 AT89S52 单片机控制器 、 语音报警 、 LCD 液晶显示 、 LED 指示灯 、 车门锁传感器 、 压力传感器和继电器等构成。 当…

单线程环境下,用时间做为单一文件名称全局唯一处理

package org.example.file;import java.util.concurrent.ConcurrentHashMap;public class FileHelper {private FileService fileService;//全局唯一&#xff0c;用于控制文件时间全局唯一//value&#xff1a;0&#xff1a;没被消费&#xff0c;1&#xff1a;被消费public stat…

EasyCVR视频汇聚平台革新播放体验:WebRTC协议赋能H.265视频流畅传输

随着科技的飞速发展和网络技术的不断革新&#xff0c;视频监控已经广泛应用于社会各个领域&#xff0c;成为现代安全管理的重要组成部分。在视频监控领域&#xff0c;视频编码技术的选择尤为重要&#xff0c;它不仅关系到视频的质量&#xff0c;还直接影响到视频的传输效率和兼…