Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例

server/2025/3/5 12:15:56/

设计一个在Linux上运行的GCC C++程序,同时连接三个不同的PostgreSQL实例,其中两个实例中分别有两个数据库中多个Schema的表结构分别与第三实例中两个数据库中多个Schema个结构完全相同,同时复制两个实例中两个数据库中多个Schema里的所有表的数据到第三个实例中两个数据库中多个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。

根据需求,我设计了以下解决方案。由于代码较长,这里分为多个部分说明:

  1. 配置文件(config.json):
{"instances": {"source1": {"host": "192.168.1.100","port": 5432,"user": "user1","password": "pass1"},"source2": {"host": "192.168.1.101","port": 5432,"user": "user2","password": "pass2"},"target": {"host": "192.168.1.102","port": 5432,"user": "target_user","password": "target_pass"}},"database_mappings": [{"source_instance": "source1","source_db": "db1","schemas": ["schema1", "schema2"],"target_db": "target_db1"},{"source_instance": "source2","source_db": "db2","schemas": ["schema3", "schema4"],"target_db": "target_db2"}],"retry": {"max_attempts": 3,"interval_seconds": 60},"log_directory": "logs"
}
  1. 数据库操作类(database.h):
#include <pqxx/pqxx>
#include <string>
#include <memory>class DatabaseConnector {
public:DatabaseConnector(const std::string& host, int port, const std::string& dbname, const std::string& user,const std::string& password){std::string conn_str = fmt::format("host={} port={} dbname={} user={} password={}",host, port, dbname, user, password);conn_ = std::make_unique<pqxx::connection>(conn_str);}pqxx::result query(const std::string& sql) {pqxx::work txn(*conn_);auto result = txn.exec(sql);txn.commit();return result;}// COPY命令高效传输数据void copy_table(const std::string& source_table, const std::string& target_table,std::shared_ptr<DatabaseConnector> target_conn) {pqxx::work source_txn(*conn_);std::stringstream buffer;source_txn.conn().copy_to(source_table, buffer);source_txn.commit();pqxx::work target_txn(*target_conn->conn_);target_txn.conn().copy_from(buffer, target_table);target_txn.commit();}private:std::unique_ptr<pqxx::connection> conn_;
};
  1. 日志管理类(logger.h):
#include <fstream>
#include <chrono>
#include <iomanip>
#include <filesystem>class Logger {
public:Logger(const std::string& log_dir) {namespace fs = std::filesystem;fs::create_directories(log_dir);auto now = std::chrono::system_clock::now();auto today = std::chrono::floor<std::chrono::days>(now);log_file_ = log_dir + "/" + format_time(today, "%Y-%m-%d") + ".log";}void log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx_);std::ofstream file(log_file_, std::ios::app);file << "[" << format_time(std::chrono::system_clock::now()) << "] " << message << "\n";}private:std::string format_time(auto time_point, const char* fmt = "%T") {auto in_time_t = std::chrono::system_clock::to_time_t(time_point);std::stringstream ss;ss << std::put_time(std::localtime(&in_time_t), fmt);return ss.str();}std::string log_file_;std::mutex mtx_;
};
  1. 主程序逻辑(main.cpp):
#include <nlohmann/json.hpp>
#include <vector>
#include <thread>struct Config {// 解析配置的代码
};void replicate_data(std::shared_ptr<DatabaseConnector> source,std::shared_ptr<DatabaseConnector> target,const std::string& schema,Logger& logger) {// 获取所有表结构auto tables = source->query(fmt::format("SELECT table_name FROM information_schema.tables ""WHERE table_schema = '{}' AND table_type = 'BASE TABLE'", schema));for (const auto& row : tables) {std::string table_name = row[0].as<std::string>();std::string qualified_name = schema + "." + table_name;// 记录开始信息logger.log(fmt::format("START Table: {}", qualified_name));try {// 使用移动数据的高效方法source->copy_table(qualified_name, qualified_name, target);// 记录成功信息logger.log(fmt::format("SUCCESS Table: {}", qualified_name));} catch (const std::exception& e) {// 异常处理和重试逻辑logger.log(fmt::format("ERROR Table: {} - {}", qualified_name, e.what()));// 重试逻辑实现}}
}int main() {// 加载配置Config config = load_config("config.json");// 初始化连接池auto source1 = create_connection(config.instances["source1"]);// 类似初始化其他连接// 创建日志器Logger logger(config.log_directory);// 处理每个数据库映射for (const auto& mapping : config.database_mappings) {// 获取源和目标连接auto source = get_connection(mapping.source_instance);auto target = get_connection("target");// 处理每个schemafor (const auto& schema : mapping.schemas) {replicate_data(source, target, schema, logger);}}return 0;
}

实现要点说明:

  1. 性能优化:
  • 使用PostgreSQL的COPY命令进行批量数据传输
  • 采用连接池管理数据库连接
  • 多线程处理不同schema的复制任务(需要添加线程池)
  1. 错误处理:
  • 三级重试机制(配置可调)
  • 事务回滚保证数据一致性
  • 详细的错误日志记录
  1. 日志管理:
  • 每天自动滚动日志文件
  • 记录毫秒级时间戳
  • 包含完整的状态信息
  • 线程安全的日志写入
  1. 扩展性:
  • 模块化设计方便扩展新的数据库实例
  • 配置驱动,无需重新编译即可修改参数
  • 支持Schema级别的并行复制

编译和运行:

# 编译命令
g++ -std=c++17 -o pg_replicator main.cpp database.cpp logger.cpp -lpqxx -lpq -lfmt# 运行命令
./pg_replicator --config config.json

建议的扩展改进方向:

  1. 增加以下监控功能:
  • 实时进度显示
  • Prometheus指标导出
  • 电子邮件/短信报警
  1. 性能优化:
  • 并行复制不同表
  • 流水线传输
  • 压缩传输
  1. 数据一致性保障:
  • 校验和验证
  • 断点续传
  • 双向同步检测
  1. 安全性增强:
  • 加密连接支持
  • 凭据安全存储
  • 审计日志

这个设计在保证功能完整性的同时,充分考虑了大规模数据复制的性能需求,并通过模块化设计保证了可维护性。异常处理和日志系统为生产环境运行提供了必要保障。


http://www.ppmy.cn/server/171892.html

相关文章

NO.21十六届蓝桥杯备战|一维数组|范围for|memset|memcpy(C++)

数组是⼀组相同类型元素的集合 数组中存放的是1个或者多个数据&#xff0c;但是数组元素个数不能为0数组中存放的多个数据&#xff0c;类型是相同的 数组分为⼀维数组和多维数组&#xff0c;多维数组⼀般⽐较多⻅的是⼆维数组 一维数组 ⼀维数组是最常⻅的&#xff0c;通常⽤…

Template Method 设计模式

模板方法模式&#xff08;Template Method Pattern&#xff09; 是一种 行为型设计模式&#xff0c;用于定义 算法的骨架&#xff0c;并允许子类在不改变算法结构的情况下 重新定义 其中的某些步骤。 核心思想&#xff1a; 在 基类 中定义 算法的整体流程&#xff08;骨架&am…

Lua的table(表)

Lua表的基本概念 Lua中的表&#xff08;table&#xff09;是一种多功能数据结构&#xff0c;可以用作数组、字典、集合等。表是Lua中唯一的数据结构机制&#xff0c;其他数据结构如数组、列表、队列等都可以通过表来实现。 表的实现 Lua的表由两部分组成&#xff1a; 数组部分…

塔能科技:工厂智慧照明,从底层科技实现照明系统的智能化控制

在全球节能减碳和智慧生活需求激增的背景下&#xff0c;基于“用软件定义硬件&#xff0c;让物联运维更简捷更节能”的产品理念&#xff0c;塔能科技的智慧照明一体化方案如新星般崛起&#xff0c;引领照明行业新方向。现在&#xff0c;我们来深入探究其背后的创新技术。该方案…

应对现代生活的健康养生指南

在科技飞速发展的现代社会&#xff0c;人们的生活方式发生了巨大改变&#xff0c;随之而来的是一系列健康问题。快节奏的生活、高强度的工作以及电子产品的过度使用&#xff0c;让我们的身体承受着前所未有的压力。因此&#xff0c;掌握正确的健康养生方法迫在眉睫。 针对久坐不…

服务器间免密登录

1. 生成SSH密钥对 在本地客户端生成SSH密钥对。生成密钥对命令为&#xff1a; ssh-keygen -t rsa 按回车键确认默认设置&#xff0c;生成的密钥对将保存在 ~/.ssh/id_rsa&#xff08;私钥&#xff09;和 ~/.ssh/id_rsa.pub&#xff08;公钥&#xff09;。 2. 上传密钥对至需…

从入门到精通:Linux 权限管理(rwx/chmod/chown)

目录 1. 引言&#xff1a;为什么需要文件权限&#xff1f; 2. 基础概念&#xff1a;理解 rwx 权限 &#xff08;1&#xff09;权限的三种角色 &#xff08;2&#xff09;权限的三种类型 &#xff08;3&#xff09;权限的两种表示法 &#xff08;4&#xff09; 目录的 rwx…

多个pdf合并成一个pdf的方法

将多个PDF文件合并优点&#xff1a; 能更容易地对其进行归档和备份.打印时可以选择双面打印&#xff0c;减少纸张的浪费。比如把住宿发票以及滴滴发票、行程单等生成一个pdf&#xff0c;双面打印或者无纸化办公情况下直接发送给财务进行存档。 方法: 利用PDF24 Tools网站 …