Flink 通过 Chunjun Oracle LogMiner 实时读取 Oracle 变更日志并写入 Doris 的方案

devtools/2025/3/24 8:13:28/

文章目录

  • 一、 技术背景
  • 二、 关键技术
    • 1、 Oracle LogMiner
    • 2、 Chunjun 的 LogMiner 关键流程
    • 3、修复 Chunjun Oracle LogMiner 问题

一、 技术背景

大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris,以支持 数据分析、BI 报表、实时数据仓库 等应用。

本方案基于 Flink + Chunjun,通过 Oracle LogMiner 解析 Redo Log,实现 低延迟 写入Doris。

 

二、 关键技术

1、 Oracle LogMiner

LogMiner 是 Oracle 提供的 redo log 解析工具,用于跟踪 INSERTUPDATEDELETE 操作。

使用LogMiner需要现在Oracle中开启,具体开启操作见:Oracle配置LogMiner

 

2、 Chunjun 的 LogMiner 关键流程

Chunjun(原 FlinkX)是 Flink 生态的数据同步框架,支持多种数据源连接器(如 Oracle、MySQL、PostgreSQL、Doris)。
其中 Chunjun Oracle LogMiner Source 用于解析 Oracle Redo Log 并转换为 Flink 数据流

如下整个流程架构:

在这里插入图片描述

Flink任务启动后

  1. 通过Chunjun的oracle logMiner连接器, 建立 Oracle 连接,启动 LogMiner 解析 Redo Log。
  2. 实时监听 V$LOGMNR_CONTENTS,解析变更数据并转换为 Flink 事件流。具体地会将Oracle不同的操作日志解析为如下数据类型即重放数据操作,
  3. Flink 任务处理数据,完成转换、清洗等操作。
  4. Flink Sink 组件(Chunjun Doris Sink)将数据写入 Doris
操作类型before(旧数据)after(新数据)Flink 处理逻辑
INSERT{新数据}直接插入
UPDATE{旧数据}{新数据}先删除旧数据,再插入新数据
DELETE{旧数据}删除数据

最后如下示例flink sql:


CREATE TABLE source  
(  ID             int,  NAME          string  
) WITH (  'connector' = 'oraclelogminer-x'  ,'url' = 'jdbc:oracle:thin:@//xxx:1521/ORCL'  ,'username' = 'system'  ,'password' = 'xxx'  ,'cat' = 'insert,delete,update'  ,'table' = 'TEST.TEST_USER'  ,'timestamp-format.standard' = 'SQL'  );  CREATE TABLE sink  
(  k4             int,  k3          string  
) WITH (  
'connector' = 'doris-x',  
'schema'='demo',  'password' = 'xxx',  'table-name' = 'mytable',  'url' = 'jdbc:mysql://xxx:9030',  'username' = 'root',  'sink.parallelism' = '1',  'lookup.error-limit' = '100',  'lookup.cache-type' = 'LRU',  'lookup.parallelism' = '1',  'lookup.cache.ttl' = '60000',  'lookup.cache.max-rows' = '10000',  'writeMode'='UPSERT'  );  insert into sink  
select ID as k4, NAME as k3  
from source;  

 

3、修复 Chunjun Oracle LogMiner 问题

在实际使用中,Chunjun Oracle LogMiner 会遇到以下问题:

  1. 关于全量增量读数据的问题
//LogMinerConfig,没有全量同步的外部配置,默认是增量读取数据
private boolean enableFetchAll = true;
  1. 无法获取监听的表
//LogMinerListener 中的LogMinerConfig没有set table的地方,
//即无法获取被监听的表,改成直接获取
logMinerConfig.getListenerTables(); 
  1. PavingData和Split 不能同时开启,默认都开启,将PavingData关闭

 


http://www.ppmy.cn/devtools/169462.html

相关文章

网络编程中客户端与服务器的搭建与协议包应用

1.客户端的搭建 2.服务器搭建 3.TCP中的粘包现象 tcp协议为了提高发送的效率,会将短时间连续发送的小数据,当做一组数据统一发送 原理是: tcp协议本身存在一个1500字节的缓存区,tcp协议每次write发送数据的时候,总是…

快速入手-基于Django的主子表间操作mysql(五)

1、如果该表中存在外键,结合实际业务情况,那可以这么写: 2、针对特殊的字典类型,可以这么定义 3、获取元组中的字典值和子表中的value值方法 4、对应的前端页面写法

llama源码学习·model.py[3]ROPE旋转位置编码(4)ROPE的应用

一、源码注释 def apply_rotary_emb(xq: torch.Tensor, # 查询矩阵xk: torch.Tensor, # 键矩阵freqs_cis: torch.Tensor, # 旋转嵌入 ) -> Tuple[torch.Tensor, torch.Tensor]:# 首先将xq和xk张量转换为浮点数# 然后使用reshape将最后一个维度拆分为两个维度,每…

《南京日报》专题报道 | 耘瞳科技“工业之眼”加码“中国智造”

在江宁开发区,机器人已不再是科幻电影里的遥远想象,他们就像人类的“同事”,在工地上忙着贴砖、刷墙、搬运、检测; 在体育训练场上帮助运动员矫正姿势; 在医院里帮助医生发现帕金森早期征兆,在智慧工厂里…

C# 派生 详解

1.1派生 继承设计的目的:经常需要扩展现有类型来添加功能(行为和数据)。 定义派生类 要在类标识符后添加冒号,接着添加基类名称。 注意:1.通过继承,基类的每个成员都出现在派生类构成的链条中。 2.除非明…

开源模型应用落地-LangGraph101-多智能体协同实践(六)

一、前言 随着人工智能技术的快速发展,如何高效处理复杂任务成了 AI 系统的一大挑战。传统的线性架构在面对多轮对话和动态决策时常常显得无能为力。而 LangGraph 这种多智能体合作框架的出现,为这个问题提供了新的解决方案。 相关文章: 开源模型应用落地-LangGraph101-探索…

react 常用插件

ts项目中如果提示 path不存需要安装 pnpm i types/node --save-dev常用插件 axios ajax请求echarts 图表插件reduxjs/toolkit redux 插件antd ui插件nprogress页面上方或者下方加载loding 常用语路由跳转使用dayjs 日期格式转换react-quill-new 富文本组件 全面兼容react 18r…

【STM32】USART串口协议串口外设-学习笔记

串口协议 通信接口 通信的目的:将一个设备的数据传送到另一个设备,扩展硬件系统。比如STM32芯片内部集成了很多功能模块,像定时器计数、PWM输出、AD采集等等。这些都是芯片内部的电路,这些电路的配置寄存器,数据寄存…