设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Visual Studio 2022的C++代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。
该解决方案实现了高效的大数据导出,通过多线程和分批处理优化性能,同时提供完善的错误处理和日志追踪功能。C++代码解决方案:
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
#include <iomanip>
#include <sql.h>
#include <sqlext.h>
#include <nlohmann/json.hpp>using json = nlohmann::json;
namespace fs = std::filesystem;// 配置结构体
struct SnowflakeConfig {std::string server;std::string user;std::string password;std::string database;std::string schema;std::string warehouse;
};struct QueryConfig {std::string label;std::string sql;int batch_size;
};// 日志管理
class Logger {
private:std::ofstream log_file;std::mutex mtx;public:Logger(const std::string& path) {log_file.open(path, std::ios::app);}void log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx);auto now = std::chrono::system_clock::now();auto time = std::chrono::system_clock::to_time_t(now);log_file << std::put_time(std::localtime(&time), "[%Y-%m-%d %H:%M:%S] ")<< message << std::endl;}
};// CSV转义处理
std::string escape_csv(const std::string& field) {if (field.find('"') != std::string::npos || field.find(',') != std::string::npos ||field.find('\n') != std::string::npos) {return "\"" + std::regex_replace(field, std::regex("\""), "\"\"") + "\"";}return field;
}// 导出任务处理
void export_task(const QueryConfig& query, const SnowflakeConfig& sf_config, const fs::path& output_dir, Logger& logger) {auto start_time = std::chrono::system_clock::now();SQLHENV env = SQL_NULL_HENV;SQLHDBC dbc = SQL_NULL_HDBC;SQLHSTMT stmt = SQL_NULL_HSTMT;try {// 初始化ODBC环境SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3, 0);// 建立连接SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);std::string conn_str = "DRIVER=SnowflakeDSIIDriver;SERVER=" + sf_config.server +";DATABASE=" + sf_config.database +";SCHEMA=" + sf_config.schema +";WAREHOUSE=" + sf_config.warehouse +";UID=" + sf_config.user +";PWD=" + sf_config.password + ";";SQLCHAR conn_out[1024];SQLSMALLINT conn_out_len;SQLRETURN ret = SQLDriverConnect(dbc, NULL, (SQLCHAR*)conn_str.c_str(), SQL_NTS,conn_out, sizeof(conn_out), &conn_out_len, SQL_DRIVER_COMPLETE);if (!SQL_SUCCEEDED(ret)) {throw std::runtime_error("Connection failed");}// 准备语句SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);ret = SQLExecDirect(stmt, (SQLCHAR*)query.sql.c_str(), SQL_NTS);if (!SQL_SUCCEEDED(ret)) {throw std::runtime_error("SQL execution failed");}// 获取列信息SQLSMALLINT num_cols;SQLNumResultCols(stmt, &num_cols);std::vector<std::string> col_names;for (int i = 1; i <= num_cols; ++i) {SQLCHAR col_name[256];SQLSMALLINT name_len;SQLColAttribute(stmt, i, SQL_DESC_NAME, col_name, sizeof(col_name), &name_len, NULL);col_names.emplace_back(reinterpret_cast<char*>(col_name));}// 准备文件输出fs::path output_path = output_dir / (query.label + ".csv");std::ofstream file(output_path, std::ios::trunc);if (!file) throw std::runtime_error("Failed to open output file");// 写入列头for (size_t i = 0; i < col_names.size(); ++i) {file << escape_csv(col_names[i]);if (i < col_names.size() - 1) file << ",";}file << "\n";// 设置批次大小SQLSetStmtAttr(stmt, SQL_ATTR_ROW_ARRAY_SIZE, (SQLPOINTER)query.batch_size, 0);SQLSetStmtAttr(stmt, SQL_ATTR_ROWS_FETCHED_PTR, &rows_fetched, 0);// 绑定列缓冲区std::vector<std::vector<char>> column_buffers(num_cols);std::vector<SQLLEN> indicators(num_cols);for (int i = 0; i < num_cols; ++i) {column_buffers[i].resize(query.batch_size * 1024);SQLBindCol(stmt, i+1, SQL_C_CHAR, column_buffers[i].data(),1024, &indicators[i]);}// 处理数据批次int total_rows = 0;while (true) {SQLRETURN fetch_ret = SQLFetchScroll(stmt, SQL_FETCH_NEXT, 0);if (fetch_ret == SQL_NO_DATA) break;if (!SQL_SUCCEEDED(fetch_ret)) break;// 处理每行数据for (int row = 0; row < rows_fetched; ++row) {for (int col = 0; col < num_cols; ++col) {if (col > 0) file << ",";if (indicators[col] != SQL_NULL_DATA) {std::string value(reinterpret_cast<char*>(column_buffers[col].data() + row * 1024));file << escape_csv(value);}}file << "\n";}total_rows += rows_fetched;logger.log(query.label + " - Fetched batch: " + std::to_string(rows_fetched));}auto end_time = std::chrono::system_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);logger.log(query.label + " - Completed. Total rows: " + std::to_string(total_rows) +" Duration: " + std::to_string(duration.count()) + "ms");} catch (const std::exception& e) {auto end_time = std::chrono::system_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);logger.log(query.label + " - Error: " + e.what() +" Duration: " + std::to_string(duration.count()) + "ms");}// 清理资源if (stmt != SQL_NULL_HSTMT) SQLFreeHandle(SQL_HANDLE_STMT, stmt);if (dbc != SQL_NULL_HDBC) {SQLDisconnect(dbc);SQLFreeHandle(SQL_HANDLE_DBC, dbc);}if (env != SQL_NULL_HENV) SQLFreeHandle(SQL_HANDLE_ENV, env);
}int main(int argc, char* argv[]) {if (argc < 3) {std::cerr << "Usage: " << argv[0] << " <config.json> <output_dir>" << std::endl;return 1;}try {// 读取配置文件std::ifstream config_file(argv[1]);json config;config_file >> config;// 解析Snowflake配置SnowflakeConfig sf_config;// ...解析配置...// 创建输出目录fs::path output_dir(argv[2]);fs::create_directories(output_dir);// 初始化日志Logger logger((output_dir / "export.log").string());// 创建线程池std::vector<std::thread> threads;for (auto& query_json : config["queries"]) {QueryConfig query{query_json["label"],query_json["sql"],query_json.value("batch_size", 1000)};threads.emplace_back([query, sf_config, output_dir, &logger]() {export_task(query, sf_config, output_dir, logger);});}// 等待所有线程完成for (auto& t : threads) {if (t.joinable()) t.join();}} catch (const std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;return 1;}return 0;
}
代码说明:
-
配置文件解析:
- 使用nlohmann/json库解析包含多个SQL查询的JSON配置文件
- 每个查询包含标签、SQL语句和批次大小
-
数据库连接:
- 使用ODBC API连接Snowflake数据库
- 支持连接参数配置(服务器、用户、密码等)
-
数据导出:
- 使用分批次获取数据(通过SQLFetchScroll)
- 动态绑定列缓冲区处理不同数据类型
- 自动处理CSV格式转义和特殊字符
-
多线程处理:
- 每个查询在独立线程中执行
- 使用线程池模式管理并发任务
-
日志系统:
- 线程安全的日志记录
- 记录每个批次的状态、总行数和执行时间
- 错误处理和信息追踪
-
异常处理:
- 全面捕获数据库操作异常
- 文件操作错误处理
- 资源泄漏防护(使用RAII管理句柄)
使用说明:
- 准备配置文件(config.json):
{"snowflake": {"server": "your_account.snowflakecomputing.com","user": "username","password": "password","database": "database","schema": "schema","warehouse": "warehouse"},"queries": [{"label": "sales_data","sql": "SELECT * FROM sales WHERE date >= '2023-01-01'","batch_size": 10000}]
}
- 编译运行:
g++ -std=c++17 -lodbc -lsnowflakeclient main.cpp -o exporter
./exporter config.json ./output
- 输出结果:
- CSV文件:./output/sales_data.csv
- 日志文件:./output/export.log