Debezium-EmbeddedEngine

news/2024/11/20 20:43:01/

提示:一个嵌入式的Kafka Connect源连接器的工作机制

文章目录

  • 前言
  • 一、控制流图
  • 二、代码分析
    • 1.构造函数
    • 2.完成回调
    • 3.连接器回调
    • 4.RUN
  • 总结


前言

工作机制:

* 独立运行:嵌入式连接器在应用程序进程中独立运行,不需要Kafka、Kafka Connect或  Zookeeper进程

* 数据传递:应用程序设置连接器并提供一个消费者函数,连接器将所有包含数据库变更事件的SourceRecord传递给该函数。

* 责任转移:应用程序负责故障恢复、可扩展性和持久性。

* 默认存储:连接器的数据库模式历史和偏移量默认存储在内存中,重启后会丢失。

* 执行与停止:连接器设计为提交给Executor或ExecutorService单线程执行,可以通过调用stop()方法或中断线程来停止。


提示:以下是本篇文章正文内容

一、控制流图

控制流图

二、代码分析

1.构造函数

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,CompletionCallback completionCallback, ConnectorCallback connectorCallback) {this.config = config;this.consumer = consumer;this.classLoader = classLoader;this.clock = clock;this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {if (!success) logger.error(msg, error);};this.connectorCallback = connectorCallback;this.completionResult = new CompletionResult();assert this.config != null;assert this.consumer != null;assert this.classLoader != null;assert this.clock != null;keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);Configuration valueConverterConfig = config;if (valueConverter instanceof JsonConverter) {// Make sure that the JSON converter is configured to NOT enable schemas ...valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();}valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);// Create the worker config, adding extra fields that are required for validation of a worker config// but that are not used within the embedded engine (since the source records are never serialized) ...Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());workerConfig = new EmbeddedConfig(embeddedConfig);}

构造函数

2.完成回调

/*** A callback function to be notified when the connector completes.*/
public interface CompletionCallback {/*** Handle the completion of the embedded connector engine.* * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error*            that prevented startup or premature termination.* @param message the completion message; never null* @param error the error, or null if there was no exception*/void handle(boolean success, String message, Throwable error);
}

这段代码定义了一个接口 CompletionCallback,其中包含一个方法 handle。该方法用于处理嵌入式连接器引擎的完成状态。
参数:
success: 布尔值,表示连接器是否正常完成。如果为 true,表示连接器正常完成;如果为 false,表示连接器启动失败或提前终止。
message: 完成消息,不能为空。
error: 异常对象,如果没有异常则为 null。 

完成回调

3.连接器回调

    /*** Callback function which informs users about the various stages a connector goes through during startup*/public interface ConnectorCallback {/*** Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has* completed successfully*/default void connectorStarted() {// nothing by default}/*** Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has* completed successfully*/default void connectorStopped() {// nothing by default}/*** Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has* completed successfully*/default void taskStarted() {// nothing by default}/*** Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has* completed successfully*/default void taskStopped() {// nothing by default}}

这段代码定义了一个接口 ConnectorCallback,其中包含了四个默认方法:connectorStarted、connectorStopped、taskStarted 和 taskStopped。这些方法用于在连接器和任务的不同生命周期阶段通知用户。
connectorStarted:当连接器成功启动时调用。
connectorStopped:当连接器成功停止时调用。
taskStarted:当连接器任务成功启动时调用。
taskStopped:当连接器任务成功停止时调用。
这些方法的默认实现为空,子类可以根据需要重写这些方法来添加自定义的回调逻辑。


连接回调

4 RUN方法核心流程

run()

核心流程参考上图

注:debezium-0.6版本 

总结

EmbeddedEngine方法和成员变量


http://www.ppmy.cn/news/1548569.html

相关文章

STM32 + 移远EC800 4G通信模块数传

一、4G模块简述 EC800M-CN 是移远通信&#xff08;Quectel&#xff09;推出的一款高性能、超小尺寸的 LTE Cat 1 无线通信模块&#xff0c;专为 M2M&#xff08;机器对机器&#xff09;和 IoT&#xff08;物联网&#xff09;应用设计。它具有以下主要特点&#xff1a; 通信速率…

SpringCloud处理Websocket消息过长自动断开连接

SpringCloud处理Websocket消息过长自动断开连接 问题描述 近期实现了客户端订阅Websocket后&#xff0c;服务端定期向客户端推送相关设备消息的功能&#xff0c;在本地测试没有问题&#xff0c;上线后却发现订阅设备数量超过一定数量后Websocket就会自动断开连接 报错日志 ja…

基于深度学习的文本信息提取方法研究(pytorch python textcnn框架)

&#x1f497;博主介绍&#x1f497;&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

人工智能之机器学习5-回归算法1【培训机构学习笔记】

培训内容&#xff1a; 模型评估 培训班上课的PPT里很多错误&#xff0c;即使讲了很多年也从没改正过来。 而且很多字母没有给出具体的解释&#xff0c;比如RSS和TSS&#xff0c;对初学者非常不友善。 个人学习&#xff1a; 分类和回归的区别 回归和分类是机器学习和统计学…

AWS EC2 ubuntu 使用密码登陆

1。使用页面登录ec2 2.切换root用户 sudo -i 3.为root用户或者其它用户配置密码 passwd user passwd root 4.修改下面文件的配置vi /etc/ssh/sshd_config PermitRootLogin和PasswordAuthentication 修改为yes 第六步&#xff1a;重启ssh服务 systemctl restart ssh 第七步…

Linux进阶:用户、用户组、权限

root用户&#xff1a;超级管理员 root用户拥有最大的系统操作权限&#xff0c;而普通用户在许多地方的权限是受限的 普通用户的权限&#xff0c;一般在其HOME目录内是不受限的。一旦出了HOME目录&#xff0c;在大多数地方&#xff0c;普通用户仅有只读和执行权限&#xff0c;无…

基于OpenCV的图片人脸检测研究

目录 摘要 第一章 引言 第二章 基于 OpenCV 的图片人脸检测 2.1 实现原理 2.2 代码实现与分析 2.3 代码详细分析 第三章 实验结果与分析 第四章 OpenCV 人脸检测的优势与局限性 4.1 优势 4.2 局限性 第五章 结论 第六章 未来展望 参考文献 摘要 人脸检测是计算机视…

Leetcode 每日一题 392.判断子序列

问题描述 给定两个字符串 s 和 t&#xff0c;我们需要判断 s 是否为 t 的子序列。子序列是指在不改变剩余字符相对位置的情况下&#xff0c;通过删除 t 中的一些&#xff08;或不删除&#xff09;字符形成的新字符串。 示例 输入&#xff1a;s "abc", t "ah…