TiDB 利用binlog 恢复-反解析binlog

embedded/2024/12/23 1:29:41/

我们知道TiDB的binlog记录了所有已经执行成功的dml语句,类似mysql binlog row模式

,TiDB官方也提供了reparo可以进行解析binlog,如下所示:

[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:153] ["Parsed start TSO"] [ts=449217508147200000]
[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:160] ["Parsed stop TSO"] [ts=449222855884800000]
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 1
merchant_id(varchar): 2
coupon_code(varchar): 4
customer_id(varchar): 4
customer_mobile_no(varchar): 5
trade_channel(varchar): 03
trade_store_id(varchar): 1440040
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 6
external_order_id(varchar): <nil>
receive_channel_code(varchar): 105
coupon_card_type(tinyint): 0
coupon_type(tinyint): 1
receive_type(smallint): 6
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 2
merchant_id(varchar): <nil>
coupon_code(varchar): 5
customer_id(varchar): 6
customer_mobile_no(varchar): 7
trade_channel(varchar): 03
trade_store_id(varchar): 1420452
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 8
external_order_id(varchar): <nil>
receive_channel_code(varchar): 03
coupon_card_type(tinyint): 0
coupon_type(tinyint): 0
receive_type(smallint): 4
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1

另外reparo 支持print 和mysql 两种模式,print只做解析打印到标准输出,不执行 SQL,mysql:是直接再下游数据库执行SQL

但是我们有的时候需要进去反解析,比如我们误删除了一些数据,我们要把误删除的数据解析成INSERT语句,这怎么办呢,TIDB目前不提供这种反解析工具,于是自己写了一个工具进行解析,代码如下:

package mainimport ("bufio""encoding/json""flag""fmt""log""os""regexp""strings""time"
)var (binlogFile stringschema     stringtable      stringsqlType    stringwhereSql   stringlogPath    stringfiledRe    = regexp.MustCompile(`\(.*?\)+:.*`)logOutFile *log.Logger
)type filed struct {Name  string `json:"name"`Value string `json:"value"`
}
type Parser struct {lines []filed
}//	type lists struct {
//		filed []filed
//	}
func compressStr(str string) string {str = strings.ReplaceAll(str, " ", "")r := strings.NewReplacer("\r", "", "\n", "")str = r.Replace(str)//匹配一个或多个空白符的正则表达式reg := regexp.MustCompile("\\s+")return reg.ReplaceAllString(str, "")
}func main() {flag.StringVar(&binlogFile, "binlogFile", "", "binlog日志文件路径")flag.StringVar(&schema, "schema", "", "要解析的数据库名称")flag.StringVar(&table, "table", "", "要解析的表名称")flag.StringVar(&sqlType, "sqlType", "", "要解析的dml类型")flag.StringVar(&logPath, "logPath", "", "输出文件路径名称")flag.Parse()if binlogFile == "" {log.Println("请输入binlog日志文件路径...")return}if schema == "" {log.Println("请输入要解析的数据库名称...")return}if table == "" {log.Println("请输入要解析的表名称...")return}if sqlType == "" {log.Println("请输入要解析的dml类型...")return}if logPath == "" {log.Println("请输入输出日志文件路径...")return}outSlowLogFile(logPath)file, err := os.Open(binlogFile)if err != nil {log.Println("读取文件失败...")return}schemaRe := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, sqlType)//schemaRe2 := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, "Insert")defer file.Close()scanner := bufio.NewScanner(file)scanner.Buffer(make([]byte, 1024*1024), 1024*1024*10)inHeader := falsevar f filedvar array []stringvar time2, _ = time.Parse("2006-01-02 15:04:05", "2024-04-01 00:00:00")fields := ""for scanner.Scan() {line := scanner.Text()if compressStr(line) == compressStr(schemaRe) {if fields != "" {fields = fmt.Sprintf("{%v}", fields)array = append(array, fields)}fields = ""inHeader = truecontinue} else if strings.Contains(line, "sync binlog success") || strings.Contains(line, "read file end") {inHeader = falsecontinue} else {if inHeader {if filedRe.MatchString(line) {f.Name = strings.Split(line, "(")[0]tmpValue := strings.Split(line, ": ")if len(tmpValue) == 1 {inHeader = falsecontinue} else {f.Value = strings.Split(line, ": ")[1]}if fields == "" {fields = fmt.Sprintf("\"%s\":\"%s\" ", f.Name, f.Value)} else {fields += fmt.Sprintf(",\"%s\":\"%s\"", f.Name, f.Value)}} else {inHeader = falsecontinue}}}}if fields != "" {fields = fmt.Sprintf("{%v}", fields)array = append(array, fields)}if err := scanner.Err(); err != nil {log.Println(err)}for i := 0; i < len(array); i++ {//decoder := json.NewDecoder(strings.NewReader(array[i]))////for key, value := range JsonToMap(array[i]) {//	fmt.Printf("键:%v,值:%d\n", key, value)//}m := make(map[string]string)err = json.Unmarshal([]byte(array[i]), &m)if err != nil {fmt.Printf("Unmarshal with error: %+v", err)}keys := ""values := ""isExec := false//newKeys := make([]string, 0, len(m))//for k := range m {//	newKeys = append(newKeys, k)//}对切片进行排序//sort.Strings(newKeys)for key, value := range m {if keys == "" {keys = fmt.Sprintf("%v", key)if value == "<nil>" {values = fmt.Sprintf("%v", "NULL")} else {values = fmt.Sprintf("'%v'", value)}} else {keys += fmt.Sprintf(",%v", key)if value == "<nil>" {values += fmt.Sprintf(",%v", "NULL")} else {values += fmt.Sprintf(",'%v'", value)}}if key == "updated_date" {isExec = trueupdateTime, _ := time.Parse("2006-01-02 15:04:05", value)if updateTime.After(time2) {isExec = true} else {isExec = false}}}if isExec {inSql := fmt.Sprintf("insert into coupon_trade_record(%v) values(%v);", keys, values)logOutFile.Println(inSql)}}
}func outSlowLogFile(outFile string) {outFilePath, err := os.OpenFile(outFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)if err != nil {log.Println(fmt.Sprintf("创建输出文件失败:%s", err))return}logOutFile = log.New(outFilePath, "", log.Lmsgprefix)
}

通过代码可以将DELETE sql直接转成insert 语句:

 至此完成将数据重新插入到业务库里面,即可完成恢复


http://www.ppmy.cn/embedded/25477.html

相关文章

13.Blender 界面介绍(下) 雕刻、纹理绘制及属性

界面介绍 1. 布局 物体的移动旋转和缩放等操作 2. 建模 里面就是有一些建模常用的功能 里面的功能对于做MMD来说不是必备的操作 3. 雕刻 使用里面的工具可以对物体本身进行修改 4. UV编辑 如果想要编辑UV贴图 将编辑模式改为纹理绘制 再点击右边的工具 如果进行编…

python学习笔记----数据容器(六)

一、数据容器的入门 python中的数据容器&#xff1a;一种可以容纳多份数据的数据类型&#xff0c;容纳的每一份数据称之为1个元素。每一个元素&#xff0c;可以是任意类型的数据&#xff0c;如字符串、数字、布尔等。 数据容器根据特点的不同&#xff0c;如&#xff1a; 是否…

wireshark RTP分析参数

主要看丢弃和Delta&#xff0c; 丢弃就是丢掉的udp包&#xff0c;所占的比率 Delta是当前udp包接收到的时间减去上一个udp包接收到的时间 根据载荷可以知道正确的delta应该是多少&#xff0c;比如G711A&#xff0c;ptime20&#xff0c;那么delta理论上应该趋近于20. 这里的de…

ansible-playbook离线升级centos内核

目录 概述实践ansible目录结构关键代码执行效果 结束 概述 内核离线包官网下载地址如下&#xff1a; 地址 实践 ansible目录结构 如对 ansible 不熟悉&#xff0c;离线包下载有问题&#xff0c;请至此地址下载&#xff0c;按本文操作可直接使用。 相关文章链接如下 文章地…

Java基础:设计模式之建造者模式

建造者模式是一种创建型设计模式&#xff0c;其主要目的是将一个复杂对象的构建过程与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。这种模式适用于创建那些包含多个组成部分、各部分之间有复杂的装配关系、且构造过程可能需要逐步进行或允许用户选择不同组…

LinkedList与链表

文章目录 ArrayList的缺陷链表链表的概念及结构链表的实现 LinkedList的使用什么是LinkedListLinkedList具体使用 ArrayList和LinkedList的区别 ArrayList的缺陷 通过源码知道&#xff0c;ArrayList底层使用数组来存储元素 由于其底层是一段连续空间&#xff0c;当在ArrayList任…

Three.js杂记(十四)———— 汽车展览·上

在学习了一些理论知识后&#xff0c;要做一下实战演练了&#xff0c;做一个简单的车辆展览来看看吧。 通过调整相机的位置&#xff0c;将导入的车辆模型分成三个视角展示。 车辆外部&#xff1a;可以观察车辆的整体外观以及轮廓结构车辆内部&#xff1a;相机在汽车内部&#…

DataV的轮播表后端返回的数据处理

由于官网上DataV的轮播表的接收data格式为: export default {data: [[行1列1, 行1列2, 行1列3],[行2列1, 行2列2, 行2列3],[行3列1, 行3列2, 行3列3],[行4列1, 行4列2, 行4列3],[行5列1, 行5列2, 行5列3],[行6列1, 行6列2, 行6列3],[行7列1, 行7列2, 行7列3],[行8列1, 行8列2,…