数据源为postgres的多表关联flink开发需求,开发思路

news/2025/3/31 8:13:08/

作为大数据实时开发工程师,开发基于 PostgreSQL 多表关联的 Flink 程序需遵循以下系统性思路,结合实时处理特性与 PostgreSQL 数据源特点进行设计:


一、数据同步与预处理

1. 配置 PostgreSQL CDC 连接

  • 使用 Flink CDC 连接器
    通过 flink-connector-postgres-cdc 捕获多表变更日志(需 PostgreSQL 开启逻辑复制并配置 wal_level=logical)。
    CREATE TABLE orders_source (order_id BIGINT PRIMARY KEY,user_id BIGINT,-- ...其他字段update_time TIMESTAMP(3)
    ) WITH ('connector' = 'postgres-cdc','hostname' = 'pg-host','port' = '5432','database-name' = 'sales_db','schema-name' = 'public','table-name' = 'orders','username' = 'flink_user','password' = 'flink_pwd','decoding.plugin.name' = 'pgoutput'  -- 使用Debezium解析逻辑日志
    );
    
    注意:需为每个关联表单独定义 CDC 源表。

2. 处理乱序与数据延迟

  • 定义 Watermark 与事件时间
    在 DDL 中指定事件时间字段并设置 Watermark,解决乱序问题:
    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
    
  • 启用 Changelog 模式
    利用 'scan.startup.mode'='latest-offset' 避免全量同步压力,并配置 'debezium.snapshot.mode'='never' 跳过历史数据(若只需增量)。

二、关联策略选择

1. 双流 Join(适用于动态表关联)

  • 场景:两表均为高频更新的事实表(如订单表与支付表)。
  • 实现:通过 Flink SQL 或 DataStream API 实现窗口关联,需注意状态管理
    SELECT o.order_id, p.payment_id
    FROM orders_source o
    JOIN payment_source p ON o.order_id = p.order_id AND p.pay_time BETWEEN o.create_time AND o.create_time + INTERVAL '1' HOUR;  -- 时间区间约束
    
    优化
    • 设置状态 TTL(table.exec.state.ttl=24h)避免状态膨胀。
    • 使用 RocksDBStateBackend 并启用增量 Checkpoint。

2. 维表 Join(静态表与动态表关联)

  • 场景:关联低频更新的维度表(如用户信息表)。
  • 方案选择
    • 预加载+定时刷新
      open() 方法中全量加载维表至内存,并通过 Timer 或异步线程定期刷新。
      // 示例:RichAsyncFunction 中加载维表
      public void open(Configuration parameters) {loadDimensionTable();  // 初始化加载setupRefreshTimer();  // 定时刷新逻辑
      }
      
    • 异步查询+缓存
      使用 AsyncIO 结合 Guava Cache 或 Caffeine,减少对 PostgreSQL 的实时查询压力。
      AsyncDataStream.unorderedWait(stream, new AsyncPostgresLookupFunction(), 10, TimeUnit.SECONDS, 100  // 最大并发请求数
      );
      
    优化
    • 缓存设置淘汰策略(如 LRU)和过期时间。
    • 使用连接池(如 HikariCP)管理 PostgreSQL 连接。

三、资源与性能调优

1. 并行度与资源分配

  • 计算密集型任务:提升 parallelism 并分配更多 CPU 资源。
  • IO 密集型任务(如维表查询):增加 TaskManager 堆外内存,避免 Full GC 影响吞吐。

2. 状态与 Checkpoint 优化

  • 启用增量 Checkpoint
    state.backend: rocksdb
    state.backend.incremental: true
    
  • 调整 Checkpoint 间隔
    根据业务容忍度设置间隔(如 1 分钟),避免频繁触发导致吞吐下降。

3. PostgreSQL 端优化

  • 索引优化:为关联字段(如 order_id)添加 B-Tree 索引。
  • 逻辑复制槽监控:定期清理堆积的 WAL 日志,防止 CDC 延迟。

四、容错与数据一致性

1. 精确一次语义保障

  • 启用两阶段提交
    若 Sink 端为支持事务的存储(如 Kafka/PG),配置 sink.semantic=exactly_once
  • 幂等写入
    对目标表设计唯一键约束,结合 UPSERT 语法处理重复数据。

2. 异常处理机制

  • 维表查询降级:缓存失效时返回默认值或记录异常日志,避免任务崩溃。
  • Dead Letter Queue
    将关联失败的数据写入侧输出流,供后续人工修复或重试。

五、测试与监控

1. 分层验证

  • 单元测试:Mock 维表数据验证关联逻辑正确性。
  • 端到端测试:使用 Testcontainers 模拟 PostgreSQL 环境,验证全链路一致性。

2. 监控指标

  • Flink Dashboard:关注 numRecordsInnumRecordsOutlatency
  • 自定义 Metrics
    统计维表缓存命中率、关联失败率等,通过 Prometheus + Grafana 可视化。
  • PostgreSQL 监控
    跟踪逻辑复制延迟(pg_stat_replication)与查询 QPS。

六、典型问题解决方案

问题场景解决策略参考方案
维表数据量大导致内存溢出分区加载 + 本地缓存淘汰策略网页5
双流 Join 状态膨胀设置 TTL + 增量 Checkpoint网页1][网页4
关联结果数据延迟高优化 Watermark 策略 + 增大并行度网页1][网页3
维表更新无法实时生效异步查询 + 定时刷新缓存网页5

flinktableapiTableException_Failed_to_execute_sql_139">报错一:org.apache.flink.table.api.TableException: Failed to execute sql

final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
  • 把下面3个参数从flink-conf.yaml中注释
#execution.checkpointing.interval: 10s
#execution.target: yarn-session
#execution.checkpointing.timeout: 60m

报错二:java.sql.SQLException: No suitable driver found for jdbc:postgresql:

https://mvnrepository.com/artifact/org.postgresql/postgresql
  • 找不到pg的驱动包,重新下载flink包解决

报错三:Caused by: java.lang.IllegalStateException

final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
  • 把下面3个参数从flink-conf.yaml中注释
#execution.checkpointing.interval: 10s
#execution.target: yarn-session
#execution.checkpointing.timeout: 60m

flinktableapiTableException_Sort_on_a_nontimeattribute_field_is_not_supported_166">报错四:org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

	SELECTc1,c2,COUNT(n1)
FROMpublic.new_complaint_common_infos_2024
WHEREcreate_date >= '2025-03-25 09:00:00' 
GROUP BYc1,c2
ORDER BYCOUNT(n1) DESC;
  • 此错误表示 ​尝试对流式数据(无界流)中的非时间属性字段进行全局排序,而 Flink 流处理引擎的排序操作需依赖时间属性字段(如事件时间或处理时间)来管理状态。

报错五:org.postgresql.util.PSQLException: ERROR: logical decoding cannot be used while in recovery

select * from pg_tbl01 limit 10;

此错误表明 ​PostgreSQL 数据库当前处于恢复模式(如备用节点或崩溃恢复中)​,而逻辑解码(Logical Decoding)功能(用于 CDC 数据捕获)在此模式下不可用。常见于以下场景:

  1. 从备用节点(Standby)读取变更数据:逻辑复制槽(Replication Slot)只能在主节点(Primary)创建和使用(换成pg的主节点和读写端口)。
  2. ​ 主节点处于崩溃恢复状态:数据库在恢复未提交事务或回滚时暂时禁止逻辑解码。
  3. wal_level 配置错误:未正确开启逻辑解码所需配置。

flinktableapiValidationException_Incremental_snapshot_for_tables_requires_primary_key_but_table_publicnew_complaint_common_infos_2024_doesnt_have_primary_key_195">报错六:org.apache.flink.table.api.ValidationException: Incremental snapshot for tables requires primary key, but table public.new_complaint_common_infos_2024 doesn’t have primary key.

  1. 为源表添加主键
  2. 使用替代唯一字段
    若表中没有显式主键,但存在唯一索引或联合唯一字段,可以在 Flink 中通过 PRIMARY KEY 语法声明逻辑主键。例如:
CREATE TABLE complaint_table (...cmplnt_nbr STRING,create_date TIMESTAMP,...PRIMARY KEY (cmplnt_nbr) NOT ENFORCED  -- 逻辑主键声明
) WITH (...);
  1. 调整 Flink CDC 配置(临时方案)​
    若无法修改源表结构,可尝试:

​全量同步模式:禁用增量快照,但会导致性能下降。
​自定义主键逻辑:在 Flink 中通过 scan.incremental.snapshot.chunk.key-column 指定逻辑主键字段(需 Flink CDC 版本支持)。

  'scan.incremental.snapshot.enabled' = 'false','scan.incremental.snapshot.chunk.key-column' = 'show_no'

报错七:java.lang.IllegalStateException: The Postgres CDC connector does not support ‘latest-offset’ startup mode when ‘scan.incremental.snapshot.enabled’ is disabled, you can enable ‘scan.incremental.snapshot.enabled’ to use this startup mode.

  • Postgres CDC连接器在scan.incremental.snapshot.enable为false时不支持‘ latest-offset ’启动模式,可以启用scan.incremental.snapshot.enable以使用此启动模式。

通过以上思路,可实现高可靠、低延迟的 PostgreSQL 多表关联 Flink 程序,需根据具体业务需求(如实时性要求、数据规模)灵活调整方案。


http://www.ppmy.cn/news/1583741.html

相关文章

ngx_http_index_set_index

定义在 src\http\modules\ngx_http_index_module.c static char * ngx_http_index_set_index(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {ngx_http_index_loc_conf_t *ilcf conf;ngx_str_t *value;ngx_uint_t i, n;ngx_http_inde…

测谎仪策略思路

来源:【东吴金工 金工专题】“高频价量相关性拥抱CTA”系列研究(四):CPV因子期货版3.0—CPV测谎机 原创 高子剑 量化邻距离 2024年09月20日 14:37 该报告主要介绍了“高频价量相关性拥抱CTA”系列研究中CPV因子期货版的相关内容,…

新能源动力电池测试设备深度解析:充放电设备与电池模拟器的差异及技术趋势

一、技术原理对比与核心技术创新 充放电设备 核心原理与硬件架构 充放电设备的核心功能是通过电力电子技术精确控制电池的充放电过程,其硬件架构包括高精度电源模块、双向DC/DC变换器、数据采集系统和温控单元。例如,在放电阶段,设备通过双向…

MySQL - 数据库基础操作

SQL语句 结构化查询语言(Structured Query Language),在关系型数据库上执行数据操作、数据检索以及数据维护的标准语言。 分类 DDL 数据定义语言(Data Definition Language),定义对数据库对象(库、表、列、索引)的操作。 DML 数据操作语言(Data Manip…

python每日十题(12)

根据字典的索引方式可知,d.get( egg ,no this food)索引的是字典第一层,但是第一层只有键food,没有键egg,故索引不出值,输出的是“no this food ”。 外层for循环是将a[0][1,2,3],a[1][4,5,6],a[2][7,8,9]依次赋给变量…

多版本PHP开发环境配置教程:WAMPServer下MySQL/Apache/MariaDB版本安装与切换

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、版本切换指南总结 前言 由于有几个项目分别使用到PHP7.0 和7.4以及8.0版本,设置mysql也会根据PHP版本使用不同的版本,于是开始研究…

【Charles的重定向】

重定向接口请求 注意:不用的时候记得关掉! 在测试和开发过程中,有时候需要修改接口的返回状态,或是返回值。在Charles中可以通过远程映射,将B接口的响应返回给A接口,从而达到修改接口响应的目的。这个功能…

在 Linux(Ubuntu / CentOS 7)上快速搭建我的世界 MineCraft 服务器,并实现远程联机,详细教程

Linux 部署 MineCraft 服务器 详细教程(丐版,无需云服务器) 一、虚拟机 Ubuntu 部署二、下载 Minecraft 服务端三、安装 JRE 21四、安装 MCS manager 面板五、搭建服务器六、本地测试连接七、下载樱花,实现内网穿透,邀…