Debezium-EmbeddedEngine

embedded/2024/11/19 11:09:41/

提示:一个嵌入式的Kafka Connect源连接器的工作机制

文章目录

  • 前言
  • 一、控制流图
  • 二、代码分析
    • 1.构造函数
    • 2.完成回调
    • 3.连接器回调
    • 4.RUN
  • 总结


前言

工作机制:

* 独立运行:嵌入式连接器在应用程序进程中独立运行,不需要Kafka、Kafka Connect或  Zookeeper进程

* 数据传递:应用程序设置连接器并提供一个消费者函数,连接器将所有包含数据库变更事件的SourceRecord传递给该函数。

* 责任转移:应用程序负责故障恢复、可扩展性和持久性。

* 默认存储:连接器的数据库模式历史和偏移量默认存储在内存中,重启后会丢失。

* 执行与停止:连接器设计为提交给Executor或ExecutorService单线程执行,可以通过调用stop()方法或中断线程来停止。


提示:以下是本篇文章正文内容

一、控制流图

控制流图

二、代码分析

1.构造函数

    private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, Consumer<SourceRecord> consumer,CompletionCallback completionCallback, ConnectorCallback connectorCallback) {this.config = config;this.consumer = consumer;this.classLoader = classLoader;this.clock = clock;this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {if (!success) logger.error(msg, error);};this.connectorCallback = connectorCallback;this.completionResult = new CompletionResult();assert this.config != null;assert this.consumer != null;assert this.classLoader != null;assert this.clock != null;keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);Configuration valueConverterConfig = config;if (valueConverter instanceof JsonConverter) {// Make sure that the JSON converter is configured to NOT enable schemas ...valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();}valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);// Create the worker config, adding extra fields that are required for validation of a worker config// but that are not used within the embedded engine (since the source records are never serialized) ...Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());workerConfig = new EmbeddedConfig(embeddedConfig);}

构造函数

2.完成回调

/*** A callback function to be notified when the connector completes.*/
public interface CompletionCallback {/*** Handle the completion of the embedded connector engine.* * @param success {@code true} if the connector completed normally, or {@code false} if the connector produced an error*            that prevented startup or premature termination.* @param message the completion message; never null* @param error the error, or null if there was no exception*/void handle(boolean success, String message, Throwable error);
}

这段代码定义了一个接口 CompletionCallback,其中包含一个方法 handle。该方法用于处理嵌入式连接器引擎的完成状态。
参数:
success: 布尔值,表示连接器是否正常完成。如果为 true,表示连接器正常完成;如果为 false,表示连接器启动失败或提前终止。
message: 完成消息,不能为空。
error: 异常对象,如果没有异常则为 null。 

完成回调

3.连接器回调

    /*** Callback function which informs users about the various stages a connector goes through during startup*/public interface ConnectorCallback {/*** Called after a connector has been successfully started by the engine; i.e. {@link SourceConnector#start(Map)} has* completed successfully*/default void connectorStarted() {// nothing by default}/*** Called after a connector has been successfully stopped by the engine; i.e. {@link SourceConnector#stop()} has* completed successfully*/default void connectorStopped() {// nothing by default}/*** Called after a connector task has been successfully started by the engine; i.e. {@link SourceTask#start(Map)} has* completed successfully*/default void taskStarted() {// nothing by default}/*** Called after a connector task has been successfully stopped by the engine; i.e. {@link SourceTask#stop()} has* completed successfully*/default void taskStopped() {// nothing by default}}

这段代码定义了一个接口 ConnectorCallback,其中包含了四个默认方法:connectorStarted、connectorStopped、taskStarted 和 taskStopped。这些方法用于在连接器和任务的不同生命周期阶段通知用户。
connectorStarted:当连接器成功启动时调用。
connectorStopped:当连接器成功停止时调用。
taskStarted:当连接器任务成功启动时调用。
taskStopped:当连接器任务成功停止时调用。
这些方法的默认实现为空,子类可以根据需要重写这些方法来添加自定义的回调逻辑。


连接回调

4 RUN方法核心流程

run()

核心流程参考上图

注:debezium-0.6版本 

总结

EmbeddedEngine方法和成员变量


http://www.ppmy.cn/embedded/138749.html

相关文章

Docker:技术架构的演进之路

前言 技术架构是指在软件开发和系统构建中&#xff0c;为了满足业务需求和技术要求&#xff0c;对系统的整体结构、组件、接口、数据流以及技术选型等方面进行的详细设计和规划。它是软件开发过程中的重要组成部分&#xff0c;为开发团队提供了明确的指导和规范&#xff0c;确…

阿里云SSL证书每三个月过期续期方法 —— 使用httpsok工具轻松自动续期

阿里云作为全球领先的云计算服务提供商&#xff0c;提供了SSL证书服务&#xff0c;可以为网站启用HTTPS加密&#xff0c;确保数据传输的安全。然而&#xff0c;许多人在使用阿里云SSL证书时可能会遇到一个问题&#xff1a;阿里云免费SSL证书每三个月就会过期&#xff0c;需要及…

开源项目低代码表单设计器FcDesigner扩展自定义组件

开源项目低代码表单设计器FcDesigner中的通过将自定义组件集成到设计器中&#xff0c;您可以添加额外的界面元素和功能&#xff0c;从而增强设计器的适用性和灵活性。以下是详细步骤&#xff0c;以帮助您创建、导入、注册和配置自定义组件。 源码地址: Github | Gitee | 文档 …

用DMA来自动控制PWM的输出(音频输出,交直流转换)

一、前提分析 举例&#xff1a;一首歌所包含的音阶有高有低&#xff0c;而按照某种编曲的顺序排列也就对应了不同的频率&#xff08;五线谱&#xff1a;1234567 对应的音阶各不相同&#xff09;所以频率可以理解为它的源头。频率的来源又可由PWM来控制故而一首歌所包含的频率序…

来LeetCode练下思维吧

3239.最少翻转使二进制矩阵回文| 给你一个 m x n 的二进制矩阵 grid 。 如果矩阵中一行或者一列从前往后与从后往前读是一样的&#xff0c;那么我们称这一行或者这一列是 回文 的。 你可以将 grid 中任意格子的值 翻转 &#xff0c;也就是将格子里的值从 0 变成 1 &#xff0…

Tomcat(17) 如何在Tomcat中配置访问日志?

在Apache Tomcat中配置访问日志是一个重要的步骤&#xff0c;它可以帮助你跟踪和分析服务器的HTTP请求。访问日志通常记录了每个请求的详细信息&#xff0c;如客户端IP地址、请求时间、请求的URL、HTTP状态码等。以下是如何在Tomcat中配置访问日志的详细步骤和代码示例。 步骤…

易考八股文之Elasticsearch合集

1、为什么要使用 Elasticsearch? 系统中的数据&#xff0c; 随着业务的发展&#xff0c; 时间的推移&#xff0c; 将会非常多&#xff0c;而业务中往往采用模糊查询进行数据的 搜索&#xff0c;而模糊查询会导致查询引擎放弃索引&#xff0c; 导致系统查询数据时都是全表扫描&…

element-plus如何修改内部样式而不影响vue其他组件的样式

使用scoped样式 可以在组件的样式中使用scoped修饰符&#xff0c;以限制样式仅作用于当前组件中的元素。这样就可以在不影响全局样式的情况下&#xff0c;修改element-plus组件的样式。 <template><div class"my-component"><el-button>按钮</e…