Visual Studio 2022和C++实现带多组标签的Snowflake SQL查询批量数据导出程序

news/2025/3/15 7:15:18/

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Visual Studio 2022的C++代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

该解决方案实现了高效的大数据导出,通过多线程和分批处理优化性能,同时提供完善的错误处理和日志追踪功能。C++代码解决方案:

#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
#include <iomanip>
#include <sql.h>
#include <sqlext.h>
#include <nlohmann/json.hpp>using json = nlohmann::json;
namespace fs = std::filesystem;// 配置结构体
struct SnowflakeConfig {std::string server;std::string user;std::string password;std::string database;std::string schema;std::string warehouse;
};struct QueryConfig {std::string label;std::string sql;int batch_size;
};// 日志管理
class Logger {
private:std::ofstream log_file;std::mutex mtx;public:Logger(const std::string& path) {log_file.open(path, std::ios::app);}void log(const std::string& message) {std::lock_guard<std::mutex> lock(mtx);auto now = std::chrono::system_clock::now();auto time = std::chrono::system_clock::to_time_t(now);log_file << std::put_time(std::localtime(&time), "[%Y-%m-%d %H:%M:%S] ")<< message << std::endl;}
};// CSV转义处理
std::string escape_csv(const std::string& field) {if (field.find('"') != std::string::npos || field.find(',') != std::string::npos ||field.find('\n') != std::string::npos) {return "\"" + std::regex_replace(field, std::regex("\""), "\"\"") + "\"";}return field;
}// 导出任务处理
void export_task(const QueryConfig& query, const SnowflakeConfig& sf_config, const fs::path& output_dir, Logger& logger) {auto start_time = std::chrono::system_clock::now();SQLHENV env = SQL_NULL_HENV;SQLHDBC dbc = SQL_NULL_HDBC;SQLHSTMT stmt = SQL_NULL_HSTMT;try {// 初始化ODBC环境SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3, 0);// 建立连接SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);std::string conn_str = "DRIVER=SnowflakeDSIIDriver;SERVER=" + sf_config.server +";DATABASE=" + sf_config.database +";SCHEMA=" + sf_config.schema +";WAREHOUSE=" + sf_config.warehouse +";UID=" + sf_config.user +";PWD=" + sf_config.password + ";";SQLCHAR conn_out[1024];SQLSMALLINT conn_out_len;SQLRETURN ret = SQLDriverConnect(dbc, NULL, (SQLCHAR*)conn_str.c_str(), SQL_NTS,conn_out, sizeof(conn_out), &conn_out_len, SQL_DRIVER_COMPLETE);if (!SQL_SUCCEEDED(ret)) {throw std::runtime_error("Connection failed");}// 准备语句SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);ret = SQLExecDirect(stmt, (SQLCHAR*)query.sql.c_str(), SQL_NTS);if (!SQL_SUCCEEDED(ret)) {throw std::runtime_error("SQL execution failed");}// 获取列信息SQLSMALLINT num_cols;SQLNumResultCols(stmt, &num_cols);std::vector<std::string> col_names;for (int i = 1; i <= num_cols; ++i) {SQLCHAR col_name[256];SQLSMALLINT name_len;SQLColAttribute(stmt, i, SQL_DESC_NAME, col_name, sizeof(col_name), &name_len, NULL);col_names.emplace_back(reinterpret_cast<char*>(col_name));}// 准备文件输出fs::path output_path = output_dir / (query.label + ".csv");std::ofstream file(output_path, std::ios::trunc);if (!file) throw std::runtime_error("Failed to open output file");// 写入列头for (size_t i = 0; i < col_names.size(); ++i) {file << escape_csv(col_names[i]);if (i < col_names.size() - 1) file << ",";}file << "\n";// 设置批次大小SQLSetStmtAttr(stmt, SQL_ATTR_ROW_ARRAY_SIZE, (SQLPOINTER)query.batch_size, 0);SQLSetStmtAttr(stmt, SQL_ATTR_ROWS_FETCHED_PTR, &rows_fetched, 0);// 绑定列缓冲区std::vector<std::vector<char>> column_buffers(num_cols);std::vector<SQLLEN> indicators(num_cols);for (int i = 0; i < num_cols; ++i) {column_buffers[i].resize(query.batch_size * 1024);SQLBindCol(stmt, i+1, SQL_C_CHAR, column_buffers[i].data(),1024, &indicators[i]);}// 处理数据批次int total_rows = 0;while (true) {SQLRETURN fetch_ret = SQLFetchScroll(stmt, SQL_FETCH_NEXT, 0);if (fetch_ret == SQL_NO_DATA) break;if (!SQL_SUCCEEDED(fetch_ret)) break;// 处理每行数据for (int row = 0; row < rows_fetched; ++row) {for (int col = 0; col < num_cols; ++col) {if (col > 0) file << ",";if (indicators[col] != SQL_NULL_DATA) {std::string value(reinterpret_cast<char*>(column_buffers[col].data() + row * 1024));file << escape_csv(value);}}file << "\n";}total_rows += rows_fetched;logger.log(query.label + " - Fetched batch: " + std::to_string(rows_fetched));}auto end_time = std::chrono::system_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);logger.log(query.label + " - Completed. Total rows: " + std::to_string(total_rows) +" Duration: " + std::to_string(duration.count()) + "ms");} catch (const std::exception& e) {auto end_time = std::chrono::system_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);logger.log(query.label + " - Error: " + e.what() +" Duration: " + std::to_string(duration.count()) + "ms");}// 清理资源if (stmt != SQL_NULL_HSTMT) SQLFreeHandle(SQL_HANDLE_STMT, stmt);if (dbc != SQL_NULL_HDBC) {SQLDisconnect(dbc);SQLFreeHandle(SQL_HANDLE_DBC, dbc);}if (env != SQL_NULL_HENV) SQLFreeHandle(SQL_HANDLE_ENV, env);
}int main(int argc, char* argv[]) {if (argc < 3) {std::cerr << "Usage: " << argv[0] << " <config.json> <output_dir>" << std::endl;return 1;}try {// 读取配置文件std::ifstream config_file(argv[1]);json config;config_file >> config;// 解析Snowflake配置SnowflakeConfig sf_config;// ...解析配置...// 创建输出目录fs::path output_dir(argv[2]);fs::create_directories(output_dir);// 初始化日志Logger logger((output_dir / "export.log").string());// 创建线程池std::vector<std::thread> threads;for (auto& query_json : config["queries"]) {QueryConfig query{query_json["label"],query_json["sql"],query_json.value("batch_size", 1000)};threads.emplace_back([query, sf_config, output_dir, &logger]() {export_task(query, sf_config, output_dir, logger);});}// 等待所有线程完成for (auto& t : threads) {if (t.joinable()) t.join();}} catch (const std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;return 1;}return 0;
}

代码说明:

  1. 配置文件解析

    • 使用nlohmann/json库解析包含多个SQL查询的JSON配置文件
    • 每个查询包含标签、SQL语句和批次大小
  2. 数据库连接

    • 使用ODBC API连接Snowflake数据库
    • 支持连接参数配置(服务器、用户、密码等)
  3. 数据导出

    • 使用分批次获取数据(通过SQLFetchScroll)
    • 动态绑定列缓冲区处理不同数据类型
    • 自动处理CSV格式转义和特殊字符
  4. 多线程处理

    • 每个查询在独立线程中执行
    • 使用线程池模式管理并发任务
  5. 日志系统

    • 线程安全的日志记录
    • 记录每个批次的状态、总行数和执行时间
    • 错误处理和信息追踪
  6. 异常处理

    • 全面捕获数据库操作异常
    • 文件操作错误处理
    • 资源泄漏防护(使用RAII管理句柄)

使用说明:

  1. 准备配置文件(config.json):
{"snowflake": {"server": "your_account.snowflakecomputing.com","user": "username","password": "password","database": "database","schema": "schema","warehouse": "warehouse"},"queries": [{"label": "sales_data","sql": "SELECT * FROM sales WHERE date >= '2023-01-01'","batch_size": 10000}]
}
  1. 编译运行:
g++ -std=c++17 -lodbc -lsnowflakeclient main.cpp -o exporter
./exporter config.json ./output
  1. 输出结果:
    • CSV文件:./output/sales_data.csv
    • 日志文件:./output/export.log

http://www.ppmy.cn/news/1579255.html

相关文章

QT | 信号与槽(超详解)

前言 对qt信号和槽的详细解释 &#x1f493; 个人主页&#xff1a;普通young man-CSDN博客 ⏩ 文章专栏&#xff1a;C_普通young man的博客-CSDN博客 ⏩ 本人giee: 普通小青年 (pu-tong-young-man) - Gitee.com 若有问题 评论区见&#x1f4dd; &#x1f389;欢迎大家点赞&am…

C++11 编译使用 aws-cpp-sdk

一、对sdk的编译前准备 1、软件需求 此文档针对于在Linux系统上使用源码进行编译开发操作系统使用原生的contos7Linux。机器配置建议 内存8G以上,CPU 4个 以上GCC 4.9.0 及以上版本Cmake 3.12以上 3.21以下apt install libcurl-devel openssl-devel libuuid-devel pulseaudio-…

内部类 ,匿名对象,编译器优化和静态成员

一.静态成员 1.1 了解静态成员 ⽤static修饰的成员变量&#xff0c;称之为静态成员变量&#xff0c;静态成员变量⼀定要在类外进⾏初始化。 报错了&#xff0c;是因为 不能直接给它值&#xff0c;这里只是声明&#xff0c;这里的值是缺省值&#xff0c;静态成员是属于静态区的…

现代密码学 | 具有保密和认证功能的安全方案

1.案例背景 1.1 2023年6月&#xff0c;微软云电子邮件泄露 事件描述&#xff1a; 2023年6月&#xff0c;属于多家美国政府机构的微软云电子邮件账户遭到非法入侵&#xff0c;其中包括了多位高级政府官员的电子邮件。据报道&#xff0c;美国国务院的10个邮件账户中共有6万封电…

基于Python+Django的网上招聘管理系统

项目介绍 PythonDjango网上招聘系统的设计与实现(Pycharm Django Vue Mysql) 平台采用B/S结构&#xff0c;后端采用主流的Python语言进行开发&#xff0c;前端采用主流的Vue.js进行开发。整个平台包括前台和后台两个部分。 - 前台功能包括&#xff1a;首页、岗位详情页、简历中…

【动态规划篇】746.使用最小花费爬楼梯

746.使用最小花费爬楼梯 题目链接&#xff1a; 746.使用最小花费爬楼梯 题目叙述&#xff1a; 给你一个整数数组 cost &#xff0c;其中 cost[i] 是从楼梯第i个台阶向上爬需要支付的费用。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。 你可以选择从下标为 …

【病毒分析】熊猫烧香病毒分析及其查杀修复

目录 前言 一、样本概况 1.1 样本信息 1.2 测试环境及工具 1.3 分析目标 二、具体行为分析 2.1 主要行为 2.1.1 恶意程序对用户造成的危害 2.2 恶意代码分析 2.2.1 加固后的恶意代码树结构图(是否有加固) 2.2.2 恶意程序的代码分析片段 三、解决方案(或总结) 3.1 …

基于PySide6与PyCatia的CATIA几何体智能重命名工具开发实践

一、工具概述 本工具基于CATIA V5/V6的二次开发接口&#xff0c;结合PySide6图形界面框架与PyCatia自动化库&#xff0c;实现了三大核心功能模块&#xff1a;几何体前缀批量添加、后缀动态追加、智能文本替换。该工具显著提升了工程师在大型零件体设计中的几何体命名管理效率&…