大数据-266 实时数仓 - Canal 对接 Kafka 客户端测试

ops/2025/1/7 7:29:50/

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)

章节内容

  • Canal 部署安装
  • 启动服务 常见问题解决

在这里插入图片描述

Canal 简介

Canal 是阿里巴巴开源的 MySQL binlog 增量订阅与消费平台。它模拟 MySQL 的主从复制机制,通过解析 MySQL 的二进制日志(binlog),实现数据库变更的数据捕获(CDC, Change Data Capture)。

  • 数据同步:支持将数据库的变更数据同步到其他数据源或消息系统,如 Kafka、RocketMQ、Elasticsearch 等。
  • 实时性:基于 binlog 的解析和订阅,能够实现毫秒级的数据变更捕获。
  • 分布式架构:支持集群部署,满足高可用性和高吞吐量需求。
  • 多种数据支持:除 MySQL 外,还支持 MariaDB 和部分兼容 MySQL 协议的数据库。

Kafka 简介

Kafka 是一个分布式消息系统,用于高吞吐量、低延迟的数据流处理。它支持消息持久化、订阅消费、流处理等功能,常用于日志采集、事件流处理、大数据分析和消息队列场景。

  • Producer(生产者):负责将数据写入 Kafka。
  • Consumer(消费者):从 Kafka 中读取数据。
  • Broker:Kafka 集群中的服务器,负责存储和分发消息。
  • Topic:消息的分类存储单元。
  • Partition:将数据分区存储,以实现并行处理和负载均衡。

Canal 与 Kafka 的集成原理

Canal 和 Kafka 通常配合使用,用于构建高效的数据同步管道,实现数据库变更到消息队列的实时推送。流程如下:

  • 数据源捕获:Canal 监听 MySQL 的 binlog 数据变更事件。
  • 数据解析:Canal 将 binlog 数据解析为 JSON 格式或其他结构化数据。
  • 消息推送:Canal 将解析后的数据发送到 Kafka 的指定 Topic 中。
  • 消息消费与处理:Kafka Consumer 消费数据,并进一步分发给其他服务或存储,如 Hadoop、Elasticsearch、Redis 等。

使用场景

数据同步与分发

  • 实现多种异构系统之间的数据一致性。
  • 比如将 MySQL 数据变更同步到 Elasticsearch,实现实时搜索引擎。

日志分析与监控

将数据库操作事件推送到 Kafka,供日志分析或实时监控系统使用。

实时数据流处理

数据经过 Kafka 进入 Flink、Spark Streaming 等流处理框架,满足复杂的数据处理需求。

缓存刷新

数据库更新后,推送变更消息到 Kafka,再由消费者更新 Redis 缓存,提高一致性和访问性能。

注意事项与优化

数据一致性保障

确保 binlog 与业务日志一致,以避免遗漏或重复消费。

分区与负载均衡

使用 Kafka 的分区机制分配不同的表或业务流量,提升并行消费能力。

消息格式优化

选择扁平化 JSON 格式传输数据,便于消费端解析处理。

容错与恢复机制

在 Canal 和 Kafka 之间配置重试机制,避免临时网络故障导致数据丢失。

安全性

配置 Canal 访问 MySQL 的最小权限账户,只授予 REPLICATION SLAVE 和 REPLICATION CLIENT 权限。

环境要求

  • MySQL
  • Canal 采集 binlog
  • 在 Kafka 做验证

新建主题

# 新建主题
kafka-topics.sh --zookeeper h123.wzk.icu:2181 --create --replication-factor 3 --partitions 1 --topic dwshow

查看主题

# 查看主题
kafka-topics.sh --zookeeper h123.wzk.icu:2181 --list

执行结果如下图所示:
在这里插入图片描述

启动生产者

# 启动生产者
kafka-console-producer.sh --broker-list h123.wzk.icu:9092 --topic dwshow

启动消费者

# 启动消费者
kafka-console-consumer.sh --bootstrap-server h123.wzk.icu:9092 --topic dwshow --from-beginning

操作数据

此时 MySQL 数据表若有变化,会将 row 类型的 log 写进 Kafka,具体格式为 JSON。

INSERT 操作

{"data": [{"id": "6","payMethod": "meituan","payName": "美团支付","description": "美团支付","payOrder": "0","online": "-1"}],"database": "dwshow","es": 1604461572000,"id": 6,"isDdl": false,"mysqlType": {"id": "int(11)","payMethod": "varchar(20)","payName": "varchar(255)","description": "varchar(255)","payOrder": "int(11)","online": "tinyint(4)"},"old": null,"pkNames": null,"sql": "","sqlType": {"id": 4,"payMethod": 12,"payName": 12,"description": 12,"payOrder": 4,"online": -6},"table": "wzk_payments","ts": 1604461572297,"type": "INSERT"
}

UPDATE 操作

{"data": [{"productId": "115908","productName": "索尼 xxx10","shopId": "100365","price": "300.0","isSale": "1","status": "0","categoryId": "10395","createTime": "2020-07-12 13:22:22","modifyTime": "2020-09-27 02:51:16"}],"database": "dwshow","es": 1601189476000,"id": 456,"isDdl": false,"mysqlType": {"productId": "bigint(11)","productName": "varchar(200)","shopId": "bigint(11)","price": "decimal(11,2)","isSale": "tinyint(4)","status": "tinyint(4)","categoryId": "int(11)","createTime": "varchar(25)","modifyTime": "datetime"},"old": [{"price": "597.80","modifyTime": "2020-07-12 13:22:22"}],"pkNames": null,"sql": "","sqlType": {"productId": -5,"productName": 12,"shopId": -5,"price": 3,"isSale": -6,"status": -6,"categoryId": 4,"createTime": 12,"modifyTime": 93},"table": "wzk_product_info","ts": 1601189477116,"type": "UPDATE"
}

DELETE 操作

{"data": [{"productId": "115908","productName": "索尼 xxx10","shopId": "100365","price": "300.0","isSale": "1","status": "0","categoryId": "10395","createTime": "2020-07-12 13:22:22","modifyTime": "2020-09-27 02:51:16"}],"database": "dwshow","es": 1601189576000,"id": 457,"isDdl": false,"mysqlType": {"productId": "bigint(11)","productName": "varchar(200)","shopId": "bigint(11)","price": "decimal(11,2)","isSale": "tinyint(4)","status": "tinyint(4)","categoryId": "int(11)","createTime": "varchar(25)","modifyTime": "datetime"},"old": null,"pkNames": null,"sql": "","sqlType": {"productId": -5,"productName": 12,"shopId": -5,"price": 3,"isSale": -6,"status": -6,"categoryId": 4,"createTime": 12,"modifyTime": 93},"table": "wzk_product_info","ts": 1601189576594,"type": "DELETE"
}

上面的 JSON 格式解释如下:

  • data:最新的数据,为json数组。
  • 如果是插入则表示最新插入的数据;
  • 如果是更新,则表示更新后的最新数据;
  • 如果是删除,则表示被删除的数据
  • database:数据库名称
  • es:事件时间,13位的时间戳
  • id:事件操作的序列号,1,2,3…
  • isDdl:是否是DDL操作
  • mysqlType:字段类型
  • old:旧数据
  • pkNames:主键名称
  • sql:SQL语句
  • sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal
  • table:表名
  • ts:日志时间
  • type:操作类型,比如DELETE,UPDATE,INSERT

当我们操作数据库的时候,可以看到Kafka 中写入了大量的数据(这里我是消费者在监听):
在这里插入图片描述


http://www.ppmy.cn/ops/147817.html

相关文章

STM32G070CB的USART1_RX引脚

简介 在使用STM32G070CBT6 的 USART1时,发现把 PA10作为 USART1_RX引脚时,接收不到数据。 问题排查 更换pin脚 使用PB6/PB7作为USART1_TX/RX, USART1 工作正常。 使用PA9/PB7作为USART1_TX/RX, USART1 同样工作正常。 示波器…

tensorflow 内存错误

使用tensorflow训练多个模型时,训练过程中容易出现内存错误,在这里记录一下解决办法。希望能帮到各位。 2025-01-02 22:31:03.489713: W tensorflow/core/common_runtime/bfc_allocator.cc:275] Allocator (GPU_0_bfc) ran out of memory trying to all…

无刷直流电机(BLDC)六步换向法

文章目录 1、三相BLDCM 基本结构2、三相BLDCM 数学模型3、有霍尔位置传感器直流无刷电机工作原理4、无位置传感器直流无刷电机工作原理5、速度检测6、六步换向双闭环模型仿真6.1 模型总览6.2 系统及参数设置6.3 六步换向模块6.4 仿真效果 7、六步换向速度闭环PWM控制参考 1、三…

数据库高安全—角色权限:角色创建角色管理

目录 3.1 角色创建 3.2 角色管理 书接上文openGauss安全整体架构&安全认证,从安全整体架构与安全认证两方面,对高斯数据库的高安全性能进行了解读,本篇我们将从角色创建和角色管理两方面对高斯数据库的角色权限进行介绍。 3.1 角色创建…

Golang设计模式目录

go语言实现设计模式 1 文章目录: 1.1 创建型模式 1.Golang设计模式之工厂模式2.Golang设计模式之抽象工厂模式3.Golang设计模式之单例模式4.Golang设计模式之建造者模式5.Golang设计模式之原型模式 1.2 结构型模式 6.Golang设计模式之适配器模式7.Golang设计模式之桥…

浅谈棋牌游戏开发流程二:后端技术选型与基础环境搭建

一、前言:客户端只是台前,后端才是幕后“指挥中心” 在上一篇“客户端技术”中,我们聊到玩家看到的一切动作、动画、界面逻辑,都靠客户端去渲染和交互。但若没有后端的支撑,玩家点了“出牌”可能就像一拳打在空气里—…

【U8+】用友U8软件中,出入库流水输出excel的时候提示报表输出引擎错误。

【问题现象】 通过天联高级版客户端登录拥有U8后, 将出入库流水输出excel的时候,提示报表输出引擎错误。 进行报表输出时出现错误,错误信息:找不到“fd6eea8b-fb40-4ce4-8ab4-cddbd9462981.htm”。 如果您正试图从最近使用的文件列…

ThinkPHP 模板引擎使用技巧:提高开发效率

ThinkPHP 模板引擎使用技巧:提高开发效率 在现代 Web 开发中,模板引擎是实现前后端分离、提高开发效率的重要工具。ThinkPHP 的模板引擎提供了灵活且高效的方式来渲染视图,本文将介绍一些实用的使用技巧,帮助开发者更好地利用 Th…