Linux上用C++和GCC开发程序实现不同PostgreSQL实例下单个数据库的多个Schema之间的稳定高效的数据迁移

server/2025/2/28 16:33:45/

设计一个在Linux上运行的GCC C++程序,同时连接两个不同的PostgreSQL实例,两个实例中分别有一个数据库的多个Schema的表结构完全相同,复制一个实例中一个数据库的多个Schema里的所有表的数据到另一个实例中一个数据库的多个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。

实现该程序的步骤和代码示例:

  1. 项目结构
.
├── CMakeLists.txt
├── config.json
├── include
│   ├── config.h
│   ├── database.h
│   ├── logger.h
│   └── worker.h
├── src
│   ├── main.cpp
│   ├── config.cpp
│   ├── database.cpp
│   ├── logger.cpp
│   └── worker.cpp
└── third_party└── json
  1. 配置文件示例 (config.json)
{"source": {"host": "source-db.example.com","port": 5432,"dbname": "source_db","user": "user","password": "pass"},"target": {"host": "target-db.example.com","port": 5432,"dbname": "target_db","user": "user","password": "pass"},"schemas": ["public", "sales"],"retry": {"max_attempts": 3,"interval_seconds": 60},"batch_size": 1000,"disable_indexes": true
}
  1. CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(pg_replicator)set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)find_package(libpqxx REQUIRED)
find_package(Threads REQUIRED)include_directories(includethird_party/json/include
)add_executable(pg_replicatorsrc/main.cppsrc/config.cppsrc/database.cppsrc/logger.cppsrc/worker.cpp
)target_link_libraries(pg_replicatorPRIVATElibpqxxpthread${CMAKE_THREAD_LIBS_INIT}
)
  1. 核心实现代码 (部分关键片段)

database.h:

#pragma once
#include <pqxx/pqxx>
#include <string>struct DBConfig {std::string host;int port;std::string dbname;std::string user;std::string password;
};class DatabaseConnector {
public:DatabaseConnector(const DBConfig& config);pqxx::connection& get_connection();private:std::unique_ptr<pqxx::connection> conn_;
};

worker.h:

#pragma once
#include <pqxx/pqxx>
#include "logger.h"struct TableStats {size_t total_records = 0;std::chrono::milliseconds duration{0};
};class DataCopier {
public:DataCopier(DatabaseConnector& src, DatabaseConnector& tgt, Logger& logger);TableStats copy_table(const std::string& schema, const std::string& table);void copy_schema(const std::string& schema);private:void disable_indexes(pqxx::transaction_base& txn, const std::string& schema, const std::string& table);void enable_indexes(pqxx::transaction_base& txn, const std::string& schema, const std::string& table);DatabaseConnector& src_;DatabaseConnector& tgt_;Logger& logger_;
};

logger.cpp:

#include "logger.h"
#include <chrono>
#include <filesystem>
#include <iomanip>
#include <mutex>namespace fs = std::filesystem;class Logger::Impl {
public:Impl() {fs::create_directories("logs");update_file();}void log(const LogEntry& entry) {std::lock_guard<std::mutex> lock(mutex_);check_rollover();file_ << entry.to_string() << std::endl;}private:void update_file() {auto now = std::chrono::system_clock::now();auto today = std::chrono::floor<std::chrono::days>(now);filename_ = "logs/log_" + std::format("{:%Y%m%d}", today) + ".log";file_.close();file_.open(filename_, std::ios::app);}void check_rollover() {auto now = std::chrono::system_clock::now();if (now >= next_check_) {update_file();next_check_ = now + std::chrono::hours(1);}}std::ofstream file_;std::string filename_;std::mutex mutex_;std::chrono::system_clock::time_point next_check_;
};Logger::Logger() : impl_(std::make_unique<Impl>()) {}
Logger::~Logger() = default;void Logger::log(const LogEntry& entry) {impl_->log(entry);
}

worker.cpp (关键复制逻辑):

TableStats DataCopier::copy_table(const std::string& schema, const std::string& table) {TableStats stats;auto start = std::chrono::steady_clock::now();pqxx::work src_txn(src_.get_connection());pqxx::work tgt_txn(tgt_.get_connection());try {// Disable indexes if configuredif (config_.disable_indexes) {disable_indexes(tgt_txn, schema, table);}// Use COPY for high-performance data transferstd::string copy_query = "COPY " + src_txn.quote_name(schema) + "." + src_txn.quote_name(table) + " TO STDOUT";auto stream = src_txn.copy_to(copy_query);std::string insert_query = "COPY " + tgt_txn.quote_name(schema) + "." + tgt_txn.quote_name(table) + " FROM STDIN";tgt_txn.copy_to(insert_query, stream);// Get record countauto count = tgt_txn.query_value<size_t>("SELECT COUNT(*) FROM " + tgt_txn.quote_name(schema) + "." + tgt_txn.quote_name(table));if (config_.disable_indexes) {enable_indexes(tgt_txn, schema, table);}tgt_txn.commit();src_txn.commit();auto end = std::chrono::steady_clock::now();stats.duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);stats.total_records = count;logger_.log({.timestamp = std::chrono::system_clock::now(),.schema = schema,.table = table,.status = "SUCCESS",.records_copied = count,.duration_ms = stats.duration.count()});} catch (const std::exception& e) {tgt_txn.abort();src_txn.abort();logger_.log({.timestamp = std::chrono::system_clock::now(),.schema = schema,.table = table,.status = "FAILED",.error_message = e.what()});throw;}return stats;
}
  1. 编译和运行
mkdir build
cd build
cmake ..
make -j4
./pg_replicator --config ../config.json

关键优化措施:

  1. 使用PostgreSQL COPY命令进行批量数据传输
  2. 事务处理确保数据一致性
  3. 索引管理优化(复制前禁用/复制后重建)
  4. 多线程并行处理不同表
  5. 连接池管理
  6. 基于libpqxx的高效C++实现
  7. 详细的日志记录和监控

异常处理机制:

  1. 网络错误自动重试
  2. 连接失效自动重建
  3. 事务失败自动回滚
  4. 数据库锁超时处理
  5. 内存溢出保护
  6. 日志文件轮换和大小限制

日志示例:

2023-10-20T14:23:45Z INFO [public.users] STARTED
2023-10-20T14:23:47Z INFO [public.users] COPIED 15382 records (2014ms)
2023-10-20T14:23:48Z ERROR [sales.orders] Connection lost, retrying (1/3)
2023-10-20T14:24:48Z INFO [sales.orders] RETRY SUCCESS
2023-10-20T14:24:50Z INFO [sales.orders] COPIED 238952 records (2156ms)

这个实现方案提供了:

  • 高性能的数据复制(使用COPY命令)
  • 完善的错误处理和重试机制
  • 详细的日志记录
  • 配置驱动的灵活性
  • 线程安全的架构
  • 自动化的索引管理
  • 每日日志轮换
  • 多Schema支持

实际部署时可以根据具体需求调整批量大小、线程数量、重试策略等参数,并通过监控日志文件来观察复制状态和性能指标。


http://www.ppmy.cn/server/171329.html

相关文章

BUG日志:使用热点或免费加速器时git链接github出现端口22拒绝访问的解决方法

问题描述 现象&#xff1a;git 连接 GitHub 时&#xff0c;报错 Connection refused on port 22。触发条件&#xff1a; 使用移动热点或免费加速器时必现。使用正常 Wi-Fi 时不出现。 解决步骤 临时切换为 HTTPS 协议&#xff08;绕过 22 端口&#xff09;&#xff1a; git r…

波导阵列天线 学习笔记11双极化全金属垂直公共馈电平板波导槽阵列天线

摘要&#xff1a; 本communicaition提出了一种双极化全金属垂直公共馈电平板波导槽阵列天线。最初提出了一种公共馈电的单层槽平板波导来实现双极化阵列。此设计消除了传统背腔公共馈电的复杂腔体边缘的必要性&#xff0c;提供了一种更简单的天线结构。在2x2子阵列种发展了宽十…

推荐一款uniapp的日历插件魔改版可显示阳历阴历农历公历

上图&#xff1a; 1.下载 https://download.csdn.net/download/ktucms/90436225 2.调用,解压后&#xff0c;放到根目录就行了。 js js methds: et_rlbox_clear:function () {var thatthis;that.pdata.datestr_start"";that.pdata.datestr_end"";},et_ch…

C#与AI的交互(以DeepSeek为例)

C#与ai的交互 与AI的交互使用的Http请求的方式&#xff0c;通过发送请求&#xff0c;服务器响应ai生成的文本 下面是完整的代码&#xff0c;我这里使用的是Ollama本地部署的deepseek&#xff0c;在联网调用api时&#xff0c;则url会有不同 public class OllamaRequester {[Se…

PyTorch-基础(CUDA、Dataset、transforms、卷积神经网络、VGG16)

PyTorch-基础 环境准备 CUDA Toolkit安装&#xff08;核显跳过此步骤&#xff09; CUDA Toolkit是NVIDIA的开发工具&#xff0c;里面提供了各种工具、如编译器、调试器和库 首先通过NVIDIA控制面板查看本机显卡驱动对应的CUDA版本&#xff0c;如何去下载对应版本的Toolkit工…

Vue03

Vue03 注&#xff1a;为Vue实例添加属性&#xff0c;写法如下 methods:{ addSex(){ Vue.set(this.student,"sex",男) } }Vue监视数据原理&#xff1a; vue会监视data中所有层次的数据 如何监测对象中的数据 通过setter实现监视&#xff0c;且要在ne…

Jmeter聚合报告导出log文档,Jmeter聚合报告导出到CSV

Jmeter聚合报告导出log文档 在Filename中输入 EKS_perf_log\\${type}_log\\${__P(UNIQUEID,${__time(YMDHMS)})}\all-graph-results-log.csv 可以得到执行的log&#xff0c;文件夹包含时间戳 Jmeter聚合报告导出到CSV 点击Save Table Data&#xff0c;保存到CSV文件中

机器学习数学基础:37.偏相关分析

偏相关分析教程 一、偏相关分析是什么 在很多复杂的系统中&#xff0c;比如地理系统&#xff0c;会有多个要素相互影响。偏相关分析就是在这样多要素构成的系统里&#xff0c;不考虑其他要素的干扰&#xff0c;专门去研究两个要素之间关系紧密程度的一种方法。用来衡量这种紧…