Flink Source 详解

ops/2024/11/19 2:08:32/

Flink Source 详解

原文

flip-27
FLIP-27 介绍了新版本Source 接口定义及架构

相比于SourceFunction,新版本的Source更具灵活性,原因是将“splits数据获取”与真“正数据获取”逻辑进行了分离
在这里插入图片描述

重要部件

Source 作为工厂类,会创建以下两个重要部件

  1. SplitEnumerator

    • 通过createEnumerator创建

    • SplitEnumerator 响应request split请求

      • handleSplitRequest
    • 工作在SourceCoordinator (官方描述如下),可以理解为在JobMaster上运行一个单线程的逻辑,所以需要跟在worker上的reader通过rpc通信

      Where to run the enumerator
      There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.
      
  2. SourceReader

    • 通过createReader创建

    • 工作在worker

    • 由于单独实现SourceReader过于复杂,官方抽象了3种比较通用的模型供开发者使用,MySqlSourceReader就是继承了SingleThreadMultiplexSourceReaderBase

      1. Sequential Single Split (File, database query, most bounded splits)
      2. Multi-split multiplexed (Kafka, Pulsar, Pravega, …)
      3. Multi-split multi-threaded (Kinesis, …)
        在这里插入图片描述

      在这里插入图片描述

      在这里插入图片描述

    • 使用了抽象后的类,开发者的关注点集中在实现一个SplitReader

      public interface SplitReader<E, SplitT extends SourceSplit> {RecordsWithSplitIds<E> fetch() throws InterruptedException;void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);void wakeUp();
      }
      
      1. fetch 获取数据,这里是包含了split信息的record
      2. 响应split改变
      3. 唤醒
  3. RecordEmitter

    1. The RecordsWithSplitIds returned by the SplitReader will be passed to an RecordEmitter one by one.
    2. The RecordEmitter is responsible for the following:
      • Convert the raw record type into the eventual record type
      • Provide an event time timestamp for the record that it processes.
    3. 在 emitRecord 方法中实现

由于通信使用mail风格的rpc(单线程串行),所以响应函数需要保证非阻塞,所以后面可以看到无论enumerator还是reader的最终响应都是在异步线程池中

Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator

MysqlSource 举例

flink cdc中的MysqlSource来举例分析

  1. MysqlSource
    • 通过 createEnumerator 创建 MySqlSourceEnumerator

      • 初始化调用start
        • 调用splitAssigner.open()
          • splitAssigner 是获取/分配split动作的真正实现
            • 创建异步线程,填充remainingSplits
      • handleSplitRequest 响应空闲worker的请求
        • assignSplits
          • splitAssigner.getNext()
            • 从 remainingSplits 拿一个可用的split
      • 调用 context.assignSplit 发送 AddSplitEvent
      • MySqlSourceEnumerator 中 splitAssigner 的实现说明
        • splitAssigner 默认实现是 MySqlHybridSplitAssigner
          • hybrid的含义,启动分为两个步骤 1. 读取全量数据 2. 全量数据读取完毕后读取增量数据。将两种模式混合在一起被称为hybird。所以MySqlSnapshotSplitAssigner可以创建两种split
            1. 通过MySqlSnapshotSplitAssigner创建存量数据的split
              • 在读取存量数据时通过chunkSplitter切分为多个split,之后分发给多个reader并行读取
                • chunkSplitter 通过 chunkKey 的范围将存量数据切分
                • 用户可以手动设置chunkKey,否则使用主key作为chunkKey,切分split
            2. 通过 createBinlogSplit 创建增量数据的split
              • 只assign一次binlog的split
              • 只能分发给一个reader,所以在进入增量模式后flink实际所有并行度上只有一个source有数据
                在这里插入图片描述
    • 通过 createReader 创建 MySqlSourceReader

      • 创建 SingleThreadFetcherManager 传入 elementQueue splitReaderSupplier
        • elementQueue: io线程和主线程公用队列,io线程写,主线程读
        • splitReaderSupplier: split reader的工厂
        • SingleThreadFetcherManager 启动后创建线程池
      • sourceOperator 收到 AddSplitEvent 调用 sourceReader.addSplits 这里 sourceReader 是 MySqlSourceReader
        • readerBase 中会调用 splitFetcherManager.addSplits(splits);
          • 由于使用的是 SingleThreadFetcherManager,所以addSplits中永远看到只同时存在一个fetcher
            • fetcher 初始化时加入默认任务 FetchTask 构造的时候传入 elementQueue 传入构造好的 splitReader
            • fetcher addSplits时加入任务 AddSplitsTask
            • fetcher 启动时调用 startFetcher
              • 调用 executors.submit(fetcher); 提交到线程池
              • 线程池中运行 runOnce
                • FetchTask 调用 splitReader.fetch() 获取records 写入 elementQueue
      • 主线程 SourceReaderBase 中的 pollNext 会被框架调用
        • 调用 getNextFetch
          • elementsQueue.poll() 取得 records
            在这里插入图片描述

其他

在Flink CDC 3.0 中

Flink Composer 中使用 WatermarkStrategy.noWatermarks()

 return env.fromSource(sourceProvider.getSource(),WatermarkStrategy.noWatermarks(),sourceDef.getName().orElse(generateDefaultSourceName(sourceDef)),new EventTypeInfo()).setParallelism(sourceParallelism);

很合理,因为pipeline的定义中不会出现聚合函数 window函数


http://www.ppmy.cn/ops/134837.html

相关文章

【汇编】c++游戏开发

由一起学编程创作的‘C/C项目实战&#xff1a;2D射击游戏开发&#xff08;简易版&#xff09;&#xff0c; 440 行源码分享来啦~’&#xff1a; C/C项目实战&#xff1a;2D射击游戏开发&#xff08;简易版&#xff09;&#xff0c; 440 行源码分享来啦~_射击c-CSDN博客文章浏览…

配置LVGL的WIN下vscode的仿真demo

首先 我是从炸鸡佬那里推荐的文章进去&#xff0c;按照流程发现实现不了 然后搜了几个首先的错误就是找不到 lvgl/lvgl.h&#xff0c;我简直蒙蔽了&#xff0c;这个文件就在lvgl的目录下&#xff0c;即便我去手动引用也还是会出错。 然后我去别的csdn找解决方案&#xff0c;依…

CVE-2024-2961漏洞的简单学习

简单介绍 PHP利用glibc iconv()中的一个缓冲区溢出漏洞&#xff0c;实现将文件读取提升为任意命令执行漏洞 在php读取文件的时候可以使用 php://filter伪协议利用 iconv 函数, 从而可以利用该漏洞进行 RCE 漏洞的利用场景 PHP的所有标准文件读取操作都受到了影响&#xff1…

决策树基本 CART Python手写实现

参考资料&#xff1a; https://blog.csdn.net/weixin_45666566/article/details/107954454 https://blog.csdn.net/Elenstone/article/details/105328111 代码如下&#xff1a; #-*- coding:utf-8 -*- import numpy as np import pandas as pd import operatordef loadDataSe…

数据结构-布隆过滤器和可逆布隆过滤器

布隆过滤器家族 普通布隆过滤器基本原理代码实现 计数布隆过滤器基本原理代码实现 可逆布隆过滤器基本原理代码实现 参考 在解决缓存穿透问题时&#xff0c;往往会用到一种高效的数据结构-布隆过滤器&#xff0c;其能够快速过滤掉不存在的非法请求&#xff0c;但其也存在一定的…

【网络】HTTP 协议

目录 基本概念基于 HTTP 的系统组成HTTP 的基本性质 HTTP 请求头 & 响应头HTTP 的请求方法HTTP 的返回码HTTP 的 CookieHTTP 缓存 Cache-Control会话HTTP/1.x 的连接管理 基本概念 HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一…

ctfshow DSBCTF web部分wp

ctfshow 单身杯 web部分wp web 签到好玩的PHP 源码&#xff1a; <?php error_reporting(0); highlight_file(__FILE__);class ctfshow {private $d ;private $s ;private $b ;private $ctf ;public function __destruct() {$this->d (string)$this->d;$this…

【Golang】——Gin 框架中的模板渲染详解

Gin 框架支持动态网页开发&#xff0c;能够通过模板渲染结合数据生成动态页面。在这篇文章中&#xff0c;我们将一步步学习如何在 Gin 框架中配置模板、渲染动态数据&#xff0c;并结合静态资源文件创建一个功能完整的动态网站。 文章目录 1. 什么是模板渲染&#xff1f;1.1 概…