MySQL Binlog 同步工具go-mysql-transfer Lua模块使用说明

embedded/2025/1/13 19:29:40/

一、go-mysql-transfer

go-mysql-transfer是一款MySQL实时、增量数据同步工具。能够实时解析MySQL二进制日志binlog,并生成指定格式的消息,同步到接收端。

go-mysql-transfer具有如下特点:

1、不依赖其它组件,一键部署

2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用

3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑

4、支持监控告警,集成Prometheus客户端

5、高可用集群部署

6、数据同步失败重试

7、全量数据初始化

详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解

项目开源地址:

gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具对你有帮助,请Star支持下

二、Lua脚本引擎

go-mysql-transfer中使用gopher-lua作为Lua虚拟机,支持Lua5.1规范。Lua作为专业的内置脚本语言,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。开发者只需要花费少量时间就能大致掌握其用法。

基于Lua的高扩展性,可以实现更为复杂的数据解析、消息生成、数据处理逻辑。

三、json模块

提供json数据格式的序列化和反序列化功能,提供encode和decode两个方法。
使用示例如下:

local json = require("json")   -- 加载json模块
local ops = require("mqOps") --加载mq操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local password = row["PASSWORD"] --获取USER_NAME列的值
local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result = {}  -- 定义一个table,作为结果
result["id"] = id
result["action"] = actionif action == "delete" -- 删除事件
thenlocal val = json.encode(result) -- 将result转为jsonops.SEND("transfer_test_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
else result["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["source"] = "binlog" -- 数据来源local val = json.encode(result) -- 将result转为jsonops.SEND("transfer_test_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容-- local obj = json.decode(val ) -- json反序列化-- print(obj ["createTime"])

四、db(数据库操作)模块

比如我们有角色表(t_role):

IDCODENAMEREMARK
1r1管理员具有所有操作权限
2r2测试员具有测试功能的操作权限

用户表(t_user):

IDUSER_NAMEPASSWORDROLE_CODECREATE_TIME
1admin123456r12020-10-20 22:00:10

我们需要监听t_user表,并向接收端发送如下格式的消息:

{"id": "1","userName": "admin""password": "123456","createTime": 100001,"roleName": "系统管理员","roleRemark": "管理后台相关信息","source": "binlog",}

基于Binlog的数据同步工具,只能监听到一行数据的变更,进行响应。无法像基于SQL的ETL工具那样具有多表连接的能力。如果要得到向上面那样的聚合数据,需要使用dbOps模块,用法如下:

local json = require("json")   -- 加载json模块
local ops = require("mqOps") --加载mq操作模块
local db = require("dbOps") --加载数据库(db)操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
-- print(json.encode(row))
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local password = row["PASSWORD"] --获取USER_NAME列的值
local roleCode = row["ROLE_CODE"] --角色编码
local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result = {}  -- 定义一个table,作为结果
result["id"] = id
result["action"] = actionif action == "delete" -- 删除事件
thenlocal val = json.encode(result) -- 将result转为jsonops.SEND("user_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
else local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL语句,不能直接使用表名,要使用(数据库名称.表名称),如:ESEAP.T_ROLElocal roleRS = db.selectOne(sql) -- 执行SQL查询,返回一条查询结果,table类型,结构如:{"CODE":"a1","ID":"1","NAME":"系统管理员","REMARK":"管理后台相关信息"}-- print(json.encode(roleRS))local roleName = roleRS["NAME"] --角色名称local roleRemark = roleRS["REMARK"] --角色描述-- local roleListRS = db.select(sql) -- 执行SQL查询,返回多条条查询结果,数组类型,元素为table,结构如:[{"CODE":"a1","ID":"1","NAME":"系统管理员","REMARK":"管理后台相关信息"}]-- print(json.encode(roleListRS))result["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["source"] = "binlog" -- 数据来源result["roleName"] = roleNameresult["roleRemark"] = roleRemarklocal val = json.encode(result) -- 将result转为jsonops.SEND("user_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
end 

dbOps模块的方法说明:
1、selectOne(sql) 查询一条数据,返回table类型的结果;如果查询不到数据,返回空table;如果查询到多个结果,会出错
2、select(sql) 查询多条数据,返回数组类型的结果,数组元素为tablem(格式如:[table1,table2]);查询不到结果,返回空table;

四、http客户端模块

让go-mysql-transfer具体发送任意http请求的能力,httpOps提供的方法说明:

1、get(url,headers) 发送get请求;url为请求地址;headers为请求头参数,table类型
2、delete(url,headers) 发送delete请求;url为请求地址;headers为请求头参数,table类型
3、post(url,headers,formItems) 发送post请求;url为请求地址;headers为请求头参数,table类型;formItems为表单数据,table类型
4、put(url,headers,formItems) 发送put请求;url为请求地址;headers为请求头参数,table类型;formItems为表单数据,table类型

上面4个方法的返回值为一个table类型的结果,元素"status_code"为http响应状态,Number类型(如:200、401、403、500等);元素body为http响应内容,string类型

httpOps模块具体用法如下:

local json = require("json")   -- 加载json模块
local ops = require("redisOps") --加载redis操作模块
local httpcli = require("httpOps") --加载http操作模块local row = ops.rawRow()  --数据库当前变更的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal _id = row["ID"] --获取ID列的值
local _userName = row["USER_NAME"] --获取USER_NAME列的值
local _password = row["PASSWORD"] --获取USER_NAME列的值
local _createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
local key = "user_".._id -- 定义keyif action == "insert" -- 插入事件
then-- getlocal url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) local res = httpcli.get(url,{Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="}) -- http get请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为tablelocal status = res.status_code--print(res.status_code)  -- http响应代码,如:200、401、403、500等--print(res.body)-- http响应内容,string类型--local resObj = json.decode(res.body) -- json反序列化响应内容--print(resObj["msg"])-- delete--local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) --local res = httpcli.delete(url,{--  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="--}) -- http delete请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table-- post--local url = "http://localhost:9999/http_tests"--local res = httpcli.post(url,{--  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="--},{--  id=_id,--  userName=_userName,--  password=_password,--  createTime=_createTime--}) -- http post请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table;第三个参数为post内容,类型为table--put--local url = "http://localhost:9999/http_tests"--local res = httpcli.put(url,{--  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="--},{--  id=_id,--  userName=_userName,--  password=_password,--  createTime=_createTime--}) -- http put请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table;第三个参数为post内容,类型为tableif status == 200then ops.SADD("user_set",userName.."|succeed") -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为valueelseops.SADD("user_set",userName.."|failed") -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为valueendend 
最后编辑于:2024-12-25 21:32:32


喜欢的朋友记得点赞、收藏、关注哦!!!


http://www.ppmy.cn/embedded/153637.html

相关文章

【蓝牙】win11 笔记本电脑连接 hc-06

文章目录 前言步骤 前言 使用电脑通过蓝牙添加串口 步骤 设置 -> 蓝牙和其他设备 点击 显示更多设备 更多蓝牙设置 COM 端口 -> 添加 有可能出现卡顿,等待一会 传出 -> 浏览 点击添加 hc-06,如果没有则点击 再次搜索 确定 添加成…

nginx反向代理和负载均衡的区别

1、反向代理,不需要服务器池,直接代理某台服务器 location / {proxy_pass http://192.168.18.201;proxy_set_header Host $host;proxy_set_header X-Forwarded-For $remote_addr; }proxy_set_header Host $host; …

30天开发操作系统 第 12 天 -- 定时器 v1.0

前言 定时器(Timer)对于操作系统非常重要。它在原理上却很简单,只是每隔一段时间(比如0.01秒)就发送一个中断信号给CPU。幸亏有了定时器,CPU才不用辛苦地去计量时间。……如果没有定时器会怎么样呢?让我们想象一下吧。 假如CPU看不到定时器而仍想计量时…

【Excel笔记_3】execl的单元格是#DIV/0!,判断如果是这个,则该单元格等于空

在 Excel 中,可以使用 IF 函数来判断单元格是否是 #DIV/0! 错误,并将其替换为空值(即空字符串 "")。具体公式如下: IF(ISERROR(A1), "", A1)或者,如果只想判断 #DIV/0! 错误&#xff…

10步打造完美ASP.NET、Web API和控制台应用程序文件夹结构

一、前言 在大型项目中,合理的文件夹结构是项目成功的关键之一。一个好的文件夹结构就像是一座井然有序的图书馆,每一本书(代码文件)都有其固定的位置,让人能迅速找到所需。它可以让团队成员更容易理解和维护代码&…

第 3 章 HTML5 编程基础教案

谢从华,高蕴梅 著.Web前端设计基础入门——HTML5、CSS3、JavaScript(微课视频版),2023, 清华大学出版社. ISBN:9787302641261. 一、教学目标(Objectives) 1. 知识目标: - 深入理解 HTML5 相较于…

使用Postman实现API自动化测试

🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 背景介绍 相信大部分开发人员和测试人员对 postman 都十分熟悉,对于开发人员和测试人员而言,使用 postman 来编写和保存测试用例会是一种比…

XML 解析器:深入解析与高效应用

XML 解析器:深入解析与高效应用 引言 XML(可扩展标记语言)作为一种重要的数据交换格式,被广泛应用于各种系统和平台中。为了有效地处理和解析XML数据,XML解析器发挥着至关重要的作用。本文将深入探讨XML解析器的原理…