不管是用 Flink CDC 还是 Kafka Connect (Debezium Connector),在实时获取数据库的 CDC 数据并以 Json 格式写入 Kafak 中时,都会遇到 DataTime / Timpstamp 类型的转换问题,即:原始数据库中的 DataTime / Timpstamp 的字面量是 2021-12-14 00:00:00
这种形式,但是,转换为 Json 后就变成了 1639411200000
(带毫秒位的 epoch 时间),这带来的问题是:下游表基于 Json 数据建表时,对应的字段/列不能直接声明为 DataTime / Timpstamp 类型,而是必须先声明 bigint / long 类型后再进行格式转换,这带来了很大的不变,更坏的影响是,在 Flink SQL 中就不能使用 like 子句来建表了,导致手写大量的 SQL。
这个问题的解法不 Flink CDC / Kafka Connect 上,而在 Debezium 自身,对于 Flink CDC 和 Kafka Connect 是通用的,就是:开发 Debezium 的 Custom Converter,自动对时间类型的数据进行格式转换。需要注意的是:Debezium 的 Custom Converter 机制做过重大升级,编写自定义的 Converter 时要注意你所使用的 Debezium 的版本!
首先,我们看一下 Debzium 官方文档对于自定义时间格式转化给出的方案:https://debezium.io/documentatio