【Web系列三十】MYSQL库表比对升级脚本

news/2025/1/17 10:06:51/

写在前面

        随着软件的迭代开发,数据库表有变动是常有的事,如果没有在开发时记录变更情况的话。对于线上生产环境下的MYSQL库表升级就会比较麻烦。

        因此本文主要提供了一个脚本,方便比对新旧数据库的sql文件,从而自动生成用户升级的sql语句。

代码

# 两个数据库对比表结构并升级
# 数据库版本更新后,有新增的表,新增的字段
# 通过对比两个库的差异,然后生成语句补充差异"""
使用说明:使用前请务必阅读理解清楚!!!
1、导出每个待更新新库的结构与数据sql文件,注意如果不需要替换数据的话,可以只导出结构sql文件
2、导出每个待更新旧库的结构与数据sql文件,注意如果不需要替换数据的话,可以只导出结构sql文件
3、给新旧库导出的sql文件内容中的数据库名加上“_new”和“_old”后缀,用于区分,注意是更改文件内容,而不是sql文件名
4、随意找一个能运行python和mysql的电脑,将新旧库的sql文件导入mysql中
5、将python中main函数的sql_info内容修改成上一步使用的mysql的连接信息,并在db_list中填充待更新库的信息,注意table_list是用于数据替换,慎用,会直接清空旧表数据
6、运行python,生成更新用的sql文件
7、检查每个sql文件是否正常
8、在mysql中选择待更新的旧库导入更新用的sql文件测试是否能正常更新
9、将更新用的sql文件拷贝至待更新mysql的服务器上,并依次导入
"""import pymysql
import datetime# 获取主键
def getPrimaryKey(cur, database, table):sql = f"""
SELECTCONSTRAINT_NAME
FROMINFORMATION_SCHEMA.TABLE_CONSTRAINTS
WHERECONSTRAINT_SCHEMA = '{database}' AND TABLE_NAME = '{table}';"""cur.execute(sql)sql_res = []while 1:res = cur.fetchone()if res is None:break  # 表示已经取完结果集ssql_res.append(res)return sql_res# 两个数据库相同的表要更新字段的数据类型(db1与db2相同的表和字段,字段的数据类型不同)
def compareColumnTypeInSameDatabaseAndSameTableAndSameColumn(cur, new, old):sql = f"""
SELECT DISTINCT*
FROM (SELECT DISTINCTtable_name, column_name, column_key, column_comment, column_type, is_nullable, column_default, extraFROMinformation_schema.columnsWHEREtable_schema = '{new}') t1
LEFT JOIN (SELECT DISTINCTtable_name, column_name, column_key, column_comment, column_type, is_nullable, column_default, extraFROMinformation_schema.columnsWHEREtable_schema = '{old}') t2
ONt1.table_name = t2.table_name AND t1.column_name = t2.column_name
WHEREt1.column_type != t2.column_type OR t1.is_nullable != t2.is_nullable OR t1.column_default != t2.column_default OR t1.extra != t2.extra
"""cur.execute(sql)sql_res = []while 1:res = cur.fetchone()if res is None:break  # 表示已经取完结果集ssql_res.append(res)# print('sql_res:', sql_res)# 对更新数据类型的sql语句进行拼接text_ = ""if len(sql_res) == 0:return text_else:sql = f"""
-- 先清除主键约束,避免冲突
SET SQL_REQUIRE_PRIMARY_KEY = OFF;"""text_ += sql + "\n"for i in sql_res:table_name = i[0]  # 表名TABLE_NAMEcolumn_name = i[1]  # 字段名COLUMN_NAMEcolumn_key = i[2]  # 是否为主键COLUMN_KEYcolumn_comment = i[3]  # 备注COLUMN_COMMENTcolumn_type = i[4]  # 字段类型COLUMN_TYPEis_nullable = i[5]  # 字段是否为空IS_NULLABLEcolumn_default = i[6]  # 默认值COLUMN_DEFAULTextra = i[7]  # 自动递增,更新时间等EXTRA# 默认值default_ = 'DEFAULT NULL' if column_default is None else f'DEFAULT {column_default}'# 字段是否为空if is_nullable == 'YES':is_nullable_ = 'NULL'else:is_nullable_ = 'NOT NULL'# 当该字段为非空时,判断默认值是否为空,如果是则清除默认值default_ = '' if column_default is None else default_# 备注column_comment_ = f"COMMENT '{column_comment}'" if column_comment else ''# 只有主键允许自增if extra == 'auto_increment' and column_key == 'PRI':column_key_ = 'PRIMARY KEY'sql_res = getPrimaryKey(cur, old, table_name)if len(sql_res) > 0:sql = f"""
ALTER TABLE`{table_name}`
DROP CONSTRAINT`{sql_res[0][0]}`;
"""text_ += sql + "\n"else:column_key_ = ''# 额外属性if extra == 'auto_increment':extra_ = 'AUTO_INCREMENT'elif extra == 'DEFAULT_GENERATED':extra_ = ''elif extra == 'DEFAULT_GENERATED on update CURRENT_TIMESTAMP':extra_ = 'ON UPDATE CURRENT_TIMESTAMP'else:extra_ = extrasql = f"""
ALTER TABLE`{table_name}`
MODIFY COLUMN`{column_name}` {column_type} {column_key_} {is_nullable_} {default_} {extra_} {column_comment_};
"""# print(sql)text_ = text_ + sql + "\n"sql = f"""
-- 恢复主键约束
SET SQL_REQUIRE_PRIMARY_KEY = ON;
"""text_ += sql + "\n"return text_# 两个数据库相同表,需要更新的数据库要添加的表字段(db1中的表,db2中存在但是缺少字段)
def compareColumnInSameDatabaseAndSameTable(cur, new, old):sql = f"""
SELECT DISTINCT*
FROM (SELECT DISTINCTt1.table_name, t1.column_name, t1.column_type, t1.column_comment, t1.is_nullable, t1.column_key,t1.column_default, t1.extra, t1.character_set_name, t1.collation_nameFROM (SELECT DISTINCTtable_name, column_name, column_comment, column_type, is_nullable, column_key,column_default, extra, character_set_name, collation_nameFROMinformation_schema.columnsWHEREtable_schema = '{new}') t1
LEFT JOIN (SELECT DISTINCTtable_name, column_name, column_comment, column_typeFROMinformation_schema.columnsWHEREtable_schema = '{old}') t2
ONt1.table_name = t2.table_name AND t1.column_name = t2.column_name
WHEREt2.table_name IS NULL
) t3
LEFT JOIN (SELECT DISTINCTt1.table_nameFROM (SELECT DISTINCTtable_name, column_name, column_comment, column_typeFROMinformation_schema.columnsWHEREtable_schema = '{new}') t1
LEFT JOIN (SELECT DISTINCTtable_name, column_name, column_comment, column_typeFROMinformation_schema.columnsWHEREtable_schema = '{old}') t2
ONt1.table_name = t2.table_name
WHEREt2.table_name IS NULL
) t4
ONt3.table_name = t4.table_name
WHEREt4.table_name IS NULL
"""cur.execute(sql)sql_res = []while 1:res = cur.fetchone()if res is None:break  # 表示已经取完结果集ssql_res.append(res)# print('sql_res:', sql_res)# 对需要新增字段的sql语句进行拼接.text_ = ""if len(sql_res) == 0:return text_else:sql = f"""
-- 先清除主键约束,避免冲突
SET SQL_REQUIRE_PRIMARY_KEY = OFF;"""text_ += sql + "\n"for i in sql_res:table_name = i[0]  # 表名column_name = i[1]  # 字段名column_type = i[2]  # 字段类型column_comment = i[3]  # 备注is_nullable = i[4]  # 字段是否为空column_key = i[5]  # 字段的主键(PRI为主键)column_default = i[6]  # 默认值extra = i[7]  # 自动递增,更新时间等character_set_name = i[8]  # 字符集collation_name = i[9]  # 排序规则# 默认值default_ = 'DEFAULT NULL' if column_default is None else f'DEFAULT {column_default}'# 字段是否为空if is_nullable == 'YES':is_nullable_ = 'NULL'else:is_nullable_ = 'NOT NULL'# 当该字段为非空时,判断默认值是否为空,如果是则清除默认值default_ = '' if column_default is None else default_# 只有主键允许自增if extra == 'auto_increment' and column_key == 'PRI':column_key_ = 'PRIMARY KEY'sql_res = getPrimaryKey(cur, old, table_name)if len(sql_res) > 0:sql = f"""
ALTER TABLE`{table_name}`
DROP CONSTRAINT`{sql_res[0][0]}`;
"""text_ += sql + "\n"else:column_key_ = ''# 额外属性if extra == 'auto_increment':extra_ = 'AUTO_INCREMENT'elif extra == 'DEFAULT_GENERATED':extra_ = ''elif extra == 'DEFAULT_GENERATED on update CURRENT_TIMESTAMP':extra_ = 'ON UPDATE CURRENT_TIMESTAMP'else:extra_ = extra# varchar类型的编码character_ = f' CHARACTER SET {character_set_name} COLLATE {collation_name}' if character_set_name else ''# 备注column_comment_ = f" COMMENT '{column_comment}'" if column_comment else ''sql = f"""
ALTER TABLE`{table_name}`
ADD COLUMN`{column_name}` {column_type} {column_key_} {character_} {is_nullable_} {default_} {extra_} {column_comment_};
"""# print(sql)text_ += sql + "\n"sql = f"""
-- 恢复主键约束
SET SQL_REQUIRE_PRIMARY_KEY = ON;
"""text_ += sql + "\n"return text_# 需要更新的数据库中需要新建的表(db1中有的表,db2中没有的)
def compareTableInSameDatabase(cur, new, old):sql = f"""
SELECTtable_name
FROMinformation_schema.tables
WHEREtable_schema = '{new}' AND table_name NOT IN (SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema = '{old}')
"""cur.execute(sql)sql_res_1 = []while 1:res = cur.fetchone()if res is None:break  # 表示已经取完结果集ssql_res_1.append(res)# print('sql_res_1:', sql_res_1)# 获取新建表的sql建表语句text_ = ""for i in sql_res_1:table_name = i[0]cur.execute(f"""SHOW CREATE TABLE {table_name}""")sql_res_2 = []while 1:res = cur.fetchone()if res is None:break  # 表示已经取完结果集ssql_res_2.append(res)# print(len(sql_res_2))# print(sql_res_2[0][1] + ";")text_ += f"""
-- ----------------------------
-- Table structure for {sql_res_2[0][0]}
-- ----------------------------
"""text_ += sql_res_2[0][1] + ";\n"return text_# 更新数据
def updateData(cur, new, old, table_list):if len(table_list) == 0:return ""else:sql = f"""
SELECT*
FROM (SELECT DISTINCTtable_name, column_name, referenced_table_name, referenced_column_name, position_in_unique_constraintFROMinformation_schema.key_column_usageWHEREtable_schema = '{new}' AND position_in_unique_constraint = 1) t1
LEFT JOIN (SELECT DISTINCTtable_name, column_name, referenced_table_name, referenced_column_name, position_in_unique_constraintFROMinformation_schema.key_column_usageWHEREtable_schema = '{old}' AND position_in_unique_constraint = 1) t2
ONt1.table_name = t2.table_name AND t1.column_name = t2.column_name
WHEREt2.column_name IS NULL AND t1.table_name NOT IN (SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema = '{new}' AND table_name NOT IN (SELECTtable_nameFROMinformation_schema.tablesWHEREtable_schema = '{old}'))
"""cur.execute(sql)sql_res = []while 1:res = cur.fetchone()if res is None:break  # 表示已经取完结果集sql_res.append(res)# print('sql_res:', sql_res)# 需要更新的数据库中需要补充的外键text_= ""text_ += """
-- 旧数据库中需要补充的外键
-- 先清除外键约束,等数据更新完成之后再添加外键约束
SET FOREIGN_KEY_CHECKS = 0;\n
"""for i in sql_res:table_name = i[0]  # 表名column_name = i[1]  # 字段名referenced_table_name = i[2]  # 关联的表名referenced_column_name = i[3]  # 关联的字段名position_in_unique_constraint = i[4]  # 关联的字段名if position_in_unique_constraint == 1:text_ += f"""
ALTER TABLE`{table_name}`
ADD FOREIGN KEY (`{column_name}`)
REFERENCES`{referenced_table_name}` (`{referenced_column_name}`)
ONDELETE CASCADE
ONUPDATE CASCADE;\n
"""# 添加外键约束text_ += """
-- 更新完成,添加外键约束
SET FOREIGN_KEY_CHECKS = 1;\n
"""# 更新规则,解析器等固定数据表的数据text_ += """
-- 更新规则,解析器等固定数据表的数据
-- 覆盖插入数据,先清除外键约束,等数据更新完成之后再添加外键约束
SET FOREIGN_KEY_CHECKS = 0;\n
"""# 删除外键约束for table_name in table_list:text_ += f"""
-- ----------------------------
-- Records of {table_name}
-- ----------------------------
TRUNCATE TABLE {table_name};
"""sql = f"""
SELECT*
FROM{table_name}
"""cur.execute(sql)field_name_list = '('for i, field_name in enumerate(cur.description):if i == len(cur.description) - 1:field_name_list += f'{table_name}.{field_name[0]}'else:field_name_list += f'{table_name}.{field_name[0]}, 'field_name_list += ')'while 1:res = cur.fetchone()if res is None:# 表示已经取完结果集breaknew_res = []for i in res:if type(i) == datetime.datetime:new_res.append(i.strftime("%Y-%m-%d %H:%M:%S"))else:new_res.append(i)text_ += f"""
INSERT INTO`{table_name}` {field_name_list}
VALUES{tuple(new_res)};\n
""".replace(', None', ', Null')# 添加外键约束text_ += """
-- 更新完成,添加外键约束
SET FOREIGN_KEY_CHECKS = 1;\n
"""return text_def run(db_list, sql_info):pymysql.install_as_MySQLdb()for db_info in db_list:# 打开数据库连接conn = pymysql.connect(host=sql_info['sql_host'],port=sql_info['sql_port'],user=sql_info['sql_user'],password=sql_info['sql_pwd'],database=db_info['new'],charset=sql_info['sql_charset'])# 获取游标cur = conn.cursor()# 生成sql语句text = ""text += compareColumnTypeInSameDatabaseAndSameTableAndSameColumn(cur, db_info['new'], db_info['old'])text += compareColumnInSameDatabaseAndSameTable(cur, db_info['new'], db_info['old'])text += compareTableInSameDatabase(cur, db_info['new'], db_info['old'])text += updateData(cur, db_info['new'], db_info['old'], db_info['table_list']) # 慎用,会清空原来的数据# 调用方法写入同目录文件中with open("update#" + db_info['name'] + ".sql", "w", encoding="utf-8") as fp:fp.write(text)cur.close()conn.commit()conn.close()print(db_info['name'] + '的更新sql生成成功')# # 执行更新的sql语句# conn2 = pymysql.connect(#     host=sql_info['sql_host'],#     port=sql_info['sql_port'],#     user=sql_info['sql_user'],#     password=sql_info['sql_pwd'],#     database=db_info['old'],#     charset=sql_info['sql_charset']# )# cur2 = conn2.cursor()# cur2.execute(text)# cur2.close()# conn2.commit()# conn2.close()# print(db_info['name'] + '的更新sql执行成功')if __name__== "__main__" :db_list = [{'name': "table1",# 进行对比的数据库(新库)'new': "table1_new",# 要更新的数据库(旧库)'old': "table1_old",# 数据需要覆盖的表,慎用,会清空原来的数据'table_list': []}]sql_info = {'sql_host': '127.0.0.1','sql_port': 3306,'sql_user': 'root','sql_pwd': 'xxxx', # 根据实际情况修改'sql_charset': 'utf8'}run(db_list, sql_info)


http://www.ppmy.cn/news/1563864.html

相关文章

Cyberchef开发operation操作之-node开发环境搭建

本文介绍一下Cyberchef开发operation操作环境的搭建工作,为后续的Cyberchef开发operation操作提供开发环境基础,这里。该篇作为我的专栏《Cyberchef 从入门到精通教程》中的一篇,详见这里。 Linux环境 由于cyberchef只支持Linux和MAC的开发…

Python 替换excel 单元格内容

要在Python中替换Excel单元格的内容,你可以使用openpyxl库。openpyxl是一个用于读写Excel 2010 xlsx/xlsm/xltx/xltm文件的库。 安装openpyxl 首先,你需要安装openpyxl库。如果还没有安装,可以使用pip进行安装: pip install ope…

RV1126+FFMPEG推流项目(3)VI模块视频编码流程

视频编码的流程: 本章节讲的是RV1126视频编码的流程,在整个项目之中视频编码功能是核心之一。视频编码流程主要分三步:VI的初始化、VENC的初始化(硬件编码)、绑定VI和VENC节点、开启VENC线程进行视频编码的采集,注意一下这里的…

react中,使用antd的Upload组件上传zip压缩包文件

需求 使用antd的Upload上传.zip压缩包文件 代码 const [uploadLoaing, setUploadLoaing] useState(false);// 辅助函数:检查文件是否为zip格式function isZipFile(file: File): boolean {const fileType file.type;return fileType application/zip || file.n…

Checkbox 多选框的使用

基础用法 在el-checkbox元素中定义v-model绑定变量,单一的checkbox中,默认绑定变量的值会是Boolean,选中为true 禁用状态 设置disabled属性即可。 多选框组 checkbox-group元素能把多个 checkbox 管理为一组,只需要在 Group …

前端下载图片

可直接看第3条 1、a链接形式(不推荐) 这种方式可以下载文件,对于图片类,浏览器会直接打开 export function downloadFile(src: string, fileName: string) {const ele document.createElement(a);ele.setAttribute(href, src)…

Keil使用STLink下载烧录闪退解决(使用STLink烧录)

一:问题出现 在使用新版本keil开发STM32的的时候在选择STLink下载,在下载的的时候会进行闪退,点击load会直接退出keil,经过检查发现是STLink驱动出了问题,只需要重新安装驱动即可,具体解决方案如下。 二&am…

Android BitmapShader更简易的实现刮刮乐功能,Kotlin

Android BitmapShader更简易的实现刮刮乐功能,Kotlin 比这种方式 Android使用PorterDuffXfermode模式PorterDuff.Mode.SRC_OUT橡皮擦实现“刮刮乐”效果,Kotlin(2)-CSDN博客 更简单实现刮刮乐效果。 import android.content.Cont…