Python将Box企业网盘里一个目录下所有文件上载到S3并导入Redshift

devtools/2025/2/26 6:27:53/
python">import configparser
import os
import logging
import threading
import time
import boto3
from ftplib import FTP_TLS
from botocore.exceptions import NoCredentialsError
from concurrent.futures import ThreadPoolExecutor# 配置日志
logging.basicConfig(filename='upload_and_import.log', level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')def download_from_box_ftps(host, user, password, remote_dir, filename, local_dir):local_filename = os.path.join(local_dir, filename)try:with FTP_TLS(host) as ftps:ftps.auth()ftps.login(user=user, passwd=password)print(ftps.prot_p())ftps.cwd(remote_dir)with open(local_filename, 'wb') as fobj:ftps.retrbinary(f'RETR {filename}', fobj.write)logging.info(f"Successfully downloaded {filename} from Box.")return local_filenameexcept Exception as e:logging.error(f"Failed to download {filename} from Box: {e}")return Nonedef upload_file_s3(file_name, bucket, object_name=None):s3_client = boto3.client('s3')if object_name is None:object_name = os.path.basename(file_name)try:response = s3_client.upload_file(file_name, bucket, object_name)logging.info(f"Successfully uploaded {file_name} to S3 as {object_name}.")return Trueexcept FileNotFoundError:logging.error(f"The file {file_name} was not found.")return Falseexcept NoCredentialsError:logging.error("Credentials not available for S3 upload.")return Falsedef import_to_redshift(s3_bucket, s3_key, redshift_connection_info):try:# 构建Redshift COPY命令,这里假设表结构已经存在,且文件格式为gzipcopy_command = f"COPY your_table_name FROM's3://{s3_bucket}/{s3_key}' " \f"CREDENTIALS 'aws_access_key_id={redshift_connection_info['aws_access_key_id']};aws_secret_access_key={redshift_connection_info['aws_secret_access_key']}' " \f"GZIP CSV IGNOREHEADER 1"# 这里需要实际建立Redshift连接并执行命令,假设使用psycopg2import psycopg2conn = psycopg2.connect(host=redshift_connection_info['host'],port=redshift_connection_info['port'],database=redshift_connection_info['database'],user=redshift_connection_info['user'],password=redshift_connection_info['password'])cur = conn.cursor()cur.execute(copy_command)conn.commit()cur.close()conn.close()logging.info(f"Successfully imported {s3_key} from S3 to Redshift.")except Exception as e:logging.error(f"Failed to import {s3_key} from S3 to Redshift: {e}")def process_file(file, box_config, s3_config, redshift_config, local_dir):local_file = download_from_box_ftps(box_config['host'], box_config['user'], box_config['password'],box_config['remote_dir'], file, local_dir)if local_file:upload_success = upload_file_s3(local_file, s3_config['bucket'], os.path.join(s3_config['s3_dir'], file))if upload_success:import_to_redshift(s3_config['bucket'], os.path.join(s3_config['s3_dir'], file), redshift_config)os.remove(local_file)def main():config = configparser.ConfigParser()config.read('config.ini')box_config = dict(config.items('BOX'))s3_config = dict(config.items('S3'))redshift_config = dict(config.items('REDSHIFT'))local_dir = 'temp_downloads'if not os.path.exists(local_dir):os.makedirs(local_dir)with ThreadPoolExecutor() as executor:box_files = []try:with FTP_TLS(box_config['host']) as ftps:ftps.auth()ftps.login(user=box_config['user'], passwd=box_config['password'])ftps.cwd(box_config['remote_dir'])box_files = ftps.nlst()except Exception as e:logging.error(f"Failed to list files in Box directory: {e}")for file in box_files:if file.endswith('.gz'):executor.submit(process_file, file, box_config, s3_config, redshift_config, local_dir)if __name__ == "__main__":main()

配置文件读取:通过 configparser 从 config.ini 文件读取Box、S3和Redshift的配置信息。
日志记录:使用 logging 模块记录运行状态日志,包括下载、上传和导入的成功与失败信息。
异步处理:使用 concurrent.futures 模块的 ThreadPoolExecutor 实现异步处理,每个文件的下载、上传和导入操作在独立的线程中执行。
异常处理:在下载、上传和导入过程中,对可能出现的异常进行捕获和处理,并记录到日志中。
临时目录管理:下载的文件存储在临时目录 temp_downloads 中,上传成功后删除该文件。

config.ini 示例

[BOX]
host = YOUR_BOX_FTP_HOST
user = YOUR_USERNAME
password = YOUR_PASSWORD
remote_dir = /REMOTE_DIRECTORY_ON_BOX/[S3]
bucket = TARGET_S3_BUCKET_NAME
s3_dir = YOUR_S3_DIRECTORY[REDSHIFT]
host = YOUR_REDSHIFT_HOST
port = YOUR_REDSHIFT_PORT
database = YOUR_REDSHIFT_DATABASE
user = YOUR_REDSHIFT_USER
password = YOUR_REDSHIFT_PASSWORD
aws_access_key_id = YOUR_AWS_ACCESS_KEY_ID
aws_secret_access_key = YOUR_AWS_SECRET_ACCESS_KEY

http://www.ppmy.cn/devtools/162743.html

相关文章

筑牢 YMatrix 质量防线:从测试出发(思路篇)

前言 随着数据库产品的复杂性和迭代速度的增加,质量问题逐渐成为开发过程中的一大挑战。测试作为确保产品质量的关键环节,不仅能够有效预防潜在缺陷,还能提升开发效率和客户满意度。 本文将从测试的重要性出发,探讨如何通过系统…

AI顿悟之旅 - 1 - DeepSeek的训练方法为什么相比GPT-o1大幅度减少算力资源?

DeepSeek R1 模型和 GPT-3 模型在训练方法上有一些关键的不同,这些不同也使得 DeepSeek R1 能够大幅降低训练成本。 用简单易懂的语言为你解释一下: GPT-3 的训练方法: 预测下一个词 (Next Word Prediction): GPT-3 和它的前辈 GPT-2 一样&#xff0c…

UVM_CALLBACK 应用举例

UVM_CALLBACK是一种基于回调函数的设计模式,允许用户在特定事件发生时插入自定义的行为。UVM提供了uvm_callback类作为基类,用户可以通过继承该类来定义自己的回调行为。采用uvm_callback基类,用户可以在不更改原始代码的情况下轻松插入调试代…

AI自动化爬虫项目对比报告

摘要 本报告旨在深入研究AI自动化爬虫项目,对比分析其在实现方式、效率提升、自托管能力等方面的差异。 随着大数据和人工智能技术的快速发展,传统网络爬虫技术面临着越来越多的挑战,如网站反爬虫机制的加强、网页结构复杂多变等。AI自动化爬虫技术应运而生,利用机器学习、…

Python 开发 creo 详细版

好的,以下是脚本的完整代码内容: from win32com import client import VBAPI from tkinter import messagebox, filedialog, Tk, Button, Entry, Label import os CREO_APP = C:/PTC/Creo 2.0/Parametric/bin/parametric.exe PART_DIR = D:/mydoc/creo_python/fin.prt OUTP…

OpenCV计算摄影学(2)图像去噪函数denoise_TVL1()

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 原始-对偶算法是用于解决特定类型变分问题(即,寻找一个函数以最小化某个泛函)的算法。特别地,图像…

webdriver-manager

webdriver-manager是一个用于管理Selenium WebDriver的命令行工具,它可以帮助用户安装、更新和启动Selenium WebDriver。以下是对webdriver-manager的详细解释: 一、webdriver-manager的用途 自动下载WebDriver:webdriver-manager可以自动检…

28.C++多态1 (多态的概念与简单使用,虚函数,final,override)

⭐上篇文章:27.C继承 3 (复杂的菱形继承与菱形虚拟继承)-CSDN博客 ⭐本篇代码:c学习/17.C三大特性-多态 橘子真甜/c-learning-of-yzc - 码云 - 开源中国 (gitee.com) ⭐标⭐是比较重要的部分 目录 一. C多态简介 1.1 构成多态的两个必要条件 二. vir…