Python 助力 DBA:高效批量管理数据库服务器的多线程解决方案-多库查询汇总工具实现

news/2024/12/21 16:34:51/

批量数据库服务器连接测试与数据汇总:Python实现方案

作为数据库服务器运维人员,我们经常需要面对大量服务器的连接测试和数据汇总工作。本文将介绍一个使用Python实现的高效解决方案,可以帮助我们快速完成这些任务。

需求概述

  1. 从配置文件中读取要测试的数据库服务器IP地址列表。
  2. 批量测试数据库服务器的连接情况。
  3. 在所有可连接的服务器上执行相同的SQL查询。
  4. 将查询结果汇总到一个单独的数据库中,并包含对应服务器的IP地址。
  5. 自动创建结果表,表名按日期随机生成。
  6. 提供详细的日志输出,包括实时的处理进度。

实现方案

我们使用Python来实现这个方案,主要利用了以下库和技术:

  • pyodbc: 用于数据库连接和操作
  • configparser: 读取配置文件
  • concurrent.futures: 实现并发处理
  • logging: 日志记录
  • 多线程技术:提高处理效率

代码实现

以下是完整的Python代码实现:

python">import pyodbc
import logging
import configparser
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import random
import string# 配置日志
def setup_logger():"""设置日志记录器,同时输出到文件和控制台"""logger = logging.getLogger()logger.setLevel(logging.INFO)# 文件处理器file_handler = logging.FileHandler('db_query_aggregation.log')file_handler.setLevel(logging.INFO)file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')file_handler.setFormatter(file_formatter)# 控制台处理器console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)console_formatter = logging.Formatter('%(message)s')console_handler.setFormatter(console_formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return loggerlogger = setup_logger()def read_config(config_file):"""读取配置文件:param config_file: 配置文件路径:return: 包含配置信息的字典"""try:config = configparser.ConfigParser()config.read(config_file)return {'ip_list_file': config['Files']['ip_list_file'],'source_db_username': config['SourceDB']['username'],'source_db_password': config['SourceDB']['password'],'target_db_info': dict(config['TargetDB']),'max_workers': int(config['Settings']['max_workers']),'query': config['Query']['sql']}except Exception as e:logger.error(f"读取配置文件时出错: {e}")raisedef read_ip_list(file_path):"""从文件中读取IP地址列表:param file_path: IP地址文件路径:return: IP地址列表"""try:with open(file_path, 'r') as file:return [line.strip() for line in file if line.strip()]except IOError as e:logger.error(f"无法读取IP地址文件: {e}")return []def create_connection(server, database, username, password):"""创建数据库连接:param server: 服务器地址:param database: 数据库名称:param username: 用户名:param password: 密码:return: 数据库连接对象,如果连接失败则返回None"""try:conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'return pyodbc.connect(conn_str, timeout=5)except pyodbc.Error as e:logger.error(f"连接到服务器 {server} 失败: {e}")return Nonedef execute_query(connection, query):"""执行SQL查询:param connection: 数据库连接对象:param query: SQL查询语句:return: 查询结果列表"""try:cursor = connection.cursor()cursor.execute(query)return cursor.fetchall()except pyodbc.Error as e:logger.error(f"执行查询时出错: {e}")return []def process_server(ip, username, password, query):"""处理单个服务器的查询:param ip: 服务器IP地址:param username: 数据库用户名:param password: 数据库密码:param query: SQL查询语句:return: 元组 (IP地址, 查询结果)"""start_time = time.time()logger.info(f"开始处理服务器 {ip}")try:conn = create_connection(ip, 'master', username, password)if conn:results = execute_query(conn, query)conn.close()end_time = time.time()processing_time = end_time - start_timelogger.info(f"服务器 {ip} 处理完成. 获取 {len(results)} 行数据. 耗时 {processing_time:.2f} 秒")return ip, resultsexcept Exception as e:logger.error(f"处理服务器 {ip} 时发生错误: {e}")logger.info(f"服务器 {ip} 处理失败")return ip, []def create_target_table(connection, table_name, columns):"""在目标数据库中创建表:param connection: 目标数据库连接对象:param table_name: 要创建的表名:param columns: 列定义列表"""try:cursor = connection.cursor()create_table_query = f"CREATE TABLE {table_name} (ServerIP VARCHAR(15), {', '.join(columns)})"cursor.execute(create_table_query)connection.commit()logger.info(f"成功创建表 {table_name}")except pyodbc.Error as e:logger.error(f"创建目标表时出错: {e}")raisedef insert_data(connection, table_name, data):"""将数据插入目标数据库:param connection: 目标数据库连接对象:param table_name: 目标表名:param data: 要插入的数据列表:return: 插入的行数"""try:cursor = connection.cursor()placeholders = ', '.join(['?' for _ in range(len(data[0]))])insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"cursor.fast_executemany = Truecursor.executemany(insert_query, data)connection.commit()return cursor.rowcountexcept pyodbc.Error as e:logger.error(f"插入数据时出错: {e}")connection.rollback()return 0def generate_table_name():"""生成随机表名:return: 生成的表名"""date_str = datetime.now().strftime("%Y%m%d")random_str = ''.join(random.choices(string.ascii_lowercase, k=5))return f"QueryResults_{date_str}_{random_str}"def main():"""主函数,协调整个数据查询和汇总过程"""try:# 读取配置config = read_config('config.ini')ip_list = read_ip_list(config['ip_list_file'])if not ip_list:logger.error("IP地址列表为空,程序终止")returnlogger.info(f"开始处理 {len(ip_list)} 个服务器")# 并发查询所有服务器results = []with ThreadPoolExecutor(max_workers=config['max_workers']) as executor:future_to_ip = {executor.submit(process_server, ip, config['source_db_username'], config['source_db_password'], config['query']): ip for ip in ip_list}for future in as_completed(future_to_ip):ip, result = future.result()if result:results.extend([(ip,) + tuple(row) for row in result])if not results:logger.info("没有查询到数据,程序终止")return# 连接目标数据库target_conn = create_connection(**config['target_db_info'])if not target_conn:logger.error("无法连接到目标数据库,程序终止")return# 创建目标表并插入数据table_name = generate_table_name()columns = [f"Column{i} VARCHAR(100)" for i in range(len(results[0]) - 1)]create_target_table(target_conn, table_name, columns)rows_inserted = insert_data(target_conn, table_name, results)target_conn.close()logger.info(f"数据汇总完成。插入 {rows_inserted} 行到表 {table_name}")print(f"查询结果已插入表: {table_name}")except Exception as e:logger.critical(f"程序执行过程中发生严重错误: {e}")print(f"程序执行过程中发生错误,请查看日志文件获取详细信息。")if __name__ == "__main__":main()

代码说明

  1. 配置文件读取:使用 configparser 模块读取配置文件,包括数据库连接信息、查询语句等。

  2. 多线程处理:使用 ThreadPoolExecutor 并发执行查询,提高效率。

  3. 异常处理:每个关键操作都包含了异常处理,确保程序的稳定性。

  4. 模块化设计:将不同功能分解为独立的函数,提高代码的可读性和可维护性。

  5. 日志记录:使用 logging 模块记录详细的操作日志,同时输出到文件和控制台。

  6. 动态表创建:在目标数据库中动态创建表,表名包含日期和随机字符串。

  7. 数据汇总:将所有服务器的查询结果汇总到一个列表中,包括服务器IP地址。

  8. 批量数据插入:使用 executemany 批量插入数据到目标表。

使用说明

  1. 创建 config.ini 配置文件,包含以下内容:
[Files]
ip_list_file = server_ip_list.txt[SourceDB]
username = your_source_username
password = your_source_password[TargetDB]
server = your_target_server
database = your_target_database
username = your_target_username
password = your_target_password[Settings]
max_workers = 50[Query]
sql = SELECT column1, column2 FROM your_table
  1. 准备一个包含要查询的服务器IP地址的文本文件(如 server_ip_list.txt)。

  2. 运行脚本,它将并发查询所有服务器,汇总结果(包括服务器IP),并插入到目标数据库的新表中。

  3. 脚本执行完成后,会输出生成的表名。

结论

这个Python脚本提供了一个高效、灵活的解决方案,可以批量测试数据库服务器连接、执行查询并汇总结果。它具有以下优点:

  • 并发处理,大幅提高效率
  • 详细的日志记录,便于监控和调试
  • 灵活的配置,易于适应不同环境
  • 异常处理完善,提高程序稳定性
  • 结果包含服务器IP,便于追踪数据来源

对于需要管理大量数据库服务器的运维人员来说,这个脚本可以显著提高工作效率。您可以根据实际需求进一步调整和优化这个脚本,例如添加更多的错误处理、优化查询性能,或者扩展功能以支持更复杂的操作。

通过使用这个脚本,您可以轻松地对多个数据库服务器进行批量操作,并将结果汇总到一个中心位置,大大简化了数据库管理和监控的工作流程。


http://www.ppmy.cn/news/1556968.html

相关文章

阻塞队列与线程池原理

1、阻塞队列 阻塞队列:当队列已满的时候,向队列中添加元素的操作会被阻塞;当队列为空的时候,从队列中取元素的操作会被阻塞。 Java 中用 BlockingQueue 接口表示阻塞队列。BlockingQueue 接口作为 Queue 的子接口,主…

SQLite 命令

SQLite 命令 SQLite 是一种轻量级的数据库管理系统,它是一个C库,提供了不需要独立服务器进程的零配置数据库。SQLite 是非常受欢迎的,因为它简单、快速、可靠,并且适用于各种大小的项目。本文将详细介绍 SQLite 的常用命令和操作…

AI 在游戏领域的革命性技术

2AGI.NET | 探索 AI 无限潜力,2AGI 为您带来最前沿资讯。 2AGI.NET:AI 游戏专题 本文介绍了AI技术在游戏领域的应用及其在其他行业的技术拓展。AI虚拟小镇通过1000多个智能体模拟真实人类行为,准确率高达85%。这种技术不仅改变了游戏行业&…

使用JUnit进行集成测试

在软件开发中,集成测试是一个非常重要的环节。 它可以确保不同模块之间的协作正确性,同时也可以发现系统的潜在问题。 JUnit是一个流行的Java测试框架,它可以帮助我们编写和运行各种类型的测试,包括单元测试和集成测试。 本文将…

c语言----选择结构

基本概念 选择结构是C语言中用于根据条件判断来执行不同代码块的结构。它允许程序在不同的条件下执行不同的操作,使程序具有决策能力。 if语句 单分支if语句 语法格式: if (条件表达式) { 执行语句块; } 功能: 当条件表达式的值为真&#…

OpenAI 与 ChatGPT 的关系解析

OpenAI 与 ChatGPT 的关系解析 基本关系 OpenAI 是公司,ChatGPT 是产品 OpenAI 是一家人工智能研究公司ChatGPT 是 OpenAI 开发的一款 AI 聊天产品ChatGPT 使用的是 OpenAI 开发的 GPT(Generative Pre-trained Transformer)模型 OpenAI 的…

vue create 创建项目 提示 Failed to check for updates 淘宝 NPM 镜像站喊你切换新域名啦

1、使用 vue create demo创建项目的时候发现 提示 “Failed to check for updates”, 执行 npm config list 看了一下 镜像源是:https://registry.npm.taobao.org 然后搜索一下发现这个淘宝这个镜像域名切换了。 公告地址:【公告】淘宝 npm …

MATLAB截取图像的一部分并保存导出,在itksnap中3D展示

**问题描述:**输入nifti图像,截取图像的一部分并输出,比如截取图像的101010这一块,并导出为nii文件 inputFile D:\aa\dcm\input.nii; % 输入文件路径subsetSize [10 10 10]; % 截取的图像块大小 subsetStart [1 1 1]; % 截取的…