python实战,提取数据汇聚到表格中

embedded/2025/3/29 3:41:56/

起因:

一个临时性的检查工作,上级单位发来数十张表格,每张表格名以一个特定的数字开头,表示其中的一个规则,每张表格有几千到几十万条数据,在每张规则表中,每条数据的序号是唯一的。然后这些表被下发到几十个部门,让其核对有无违规情况,各部门根据规则来核查自身情况,满足其中规则的就以对应的规则数字为文件名的前缀,满足哪些数据就填写对应的序号,这样一来,各部门完成自查后生成了几百个文件,将这些文件最后再归集到数十张规则表就是一个大难题了。如果人工来统计,由于数据量庞大,可能10个人一天一夜也不一定能完成。

实现

逻辑上可以这样,先将模板表按数字规则做字典,每个数字就是一个key,对应的路径就是字典值,如下:

python">def get_template_files(input_dir) -> Dict[str, str]:files = {}try:# 遍历目录下所有文件for filename in os.listdir(input_dir):if filename.lower().endswith('.xlsx'):file_path = os.path.join(input_dir, filename)shu_zi = extract_numbers(filename)if shu_zi:if shu_zi == "0":continuefiles[shu_zi] = file_pathfinally:# 退出 Excel 应用passlogging.info("模板数:{}".format(len(files)))for k, v in files.items():logging.info("template_files:[{}]: {}".format(k, v))make_file_writable(v)logging.info("get_template_files end.\n\n")return files

再将所有部门的文件放在一起,递归遍历,并将文件名前缀提取出来作为key,相同key的放在一个列表中,表示为同一规则,

python">def list_all_files(directory, templates: Dict[str, str]):"""遍历指定目录下的所有文件,并返回包含文件完整路径的列表。参数:directory (str): 目标目录的路径。返回:list: 包含所有文件完整路径的列表。"""file_dic_list = {}logging.info("list_all_files begin...")def traverse(current_dir):"""递归遍历当前目录下的所有文件和子目录,将文件路径添加到 file_list 中。参数:current_dir (str): 当前遍历的目录路径。"""try:for entry in os.listdir(current_dir):full_path = os.path.join(current_dir, entry)if os.path.isdir(full_path):traverse(full_path)else:shu_zi = extract_numbers(entry)if shu_zi and shu_zi in templates:if shu_zi not in file_dic_list:file_dic_list[shu_zi] = []file_dic_list[shu_zi].append(full_path)except Exception as e:logging.error(f"访问目录 {current_dir} 时出错: {e}")traverse(directory)for k, v in file_dic_list.items():for v1 in v:logging.info("[{}]: {}".format(k, v1))logging.info("list_all_files end.\n\n")return file_dic_list

然后,遍历所有同一规则的文件,将其第1列的序号作为这一规则下的key,第2,3,4列作为内容,并与相同规则的模板文件进行匹配,若序号相同,则将这一行2,3,4列单元格内容填充至模板文件,最后结束时保存模板文件。

至此,程序完成,附全部代码:

python">#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import xlwings as xw
import re
from typing import Dict
import logging
from concurrent_log_handler import ConcurrentRotatingFileHandler  # NOQA: F401
import statdef extract_numbers(line):"""提取开头的数字,并返回这个数字,如果没有提取到,则返回空字符串。参数:input_string (str): 字符串,每行以数字开头,后跟非数字字符。返回:rel: 提取到的数字"""rel = ""# 匹配每行开头连续的数字match = re.match(r'\s*(\d+)', line)if match:rel = match.group(1)return reldef get_template_files(input_dir) -> Dict[str, str]:files = {}try:# 遍历目录下所有文件for filename in os.listdir(input_dir):if filename.lower().endswith('.xlsx'):file_path = os.path.join(input_dir, filename)shu_zi = extract_numbers(filename)if shu_zi:if shu_zi == "0":continuefiles[shu_zi] = file_pathfinally:# 退出 Excel 应用passlogging.info("模板数:{}".format(len(files)))for k, v in files.items():logging.info("template_files:[{}]: {}".format(k, v))make_file_writable(v)logging.info("get_template_files end.\n\n")return filesdef list_all_files(directory, templates: Dict[str, str]):"""遍历指定目录下的所有文件,并返回包含文件完整路径的列表。参数:directory (str): 目标目录的路径。返回:list: 包含所有文件完整路径的列表。"""file_dic_list = {}logging.info("list_all_files begin...")def traverse(current_dir):"""递归遍历当前目录下的所有文件和子目录,将文件路径添加到 file_list 中。参数:current_dir (str): 当前遍历的目录路径。"""try:for entry in os.listdir(current_dir):full_path = os.path.join(current_dir, entry)if os.path.isdir(full_path):traverse(full_path)else:shu_zi = extract_numbers(entry)if shu_zi and shu_zi in templates:if shu_zi not in file_dic_list:file_dic_list[shu_zi] = []file_dic_list[shu_zi].append(full_path)except Exception as e:logging.error(f"访问目录 {current_dir} 时出错: {e}")traverse(directory)for k, v in file_dic_list.items():for v1 in v:logging.info("[{}]: {}".format(k, v1))logging.info("list_all_files end.\n\n")return file_dic_listdef is_integer(s):"""判断字符串是否为整型数字。参数:s (str): 待判断的字符串。返回:bool: 如果 s 表示整型数字,则返回 True;否则返回 False。"""try:# 尝试将字符串转换为整型int(s)return Trueexcept ValueError:return Falsedef open_excel_file(k, file_path, app):datas = {}logging.info(f"open_file: {file_path}")wb = app.books.open(file_path)sheet = wb.sheets[0]# 获取工作表中使用的区域used_range = sheet.used_rangelast_row = used_range.last_cell.rowfor row in range(1, last_row + 1):# 获取当前行的前四列数据(A、B、C、D 列)row_values = sheet.range((row, 1), (row, 4)).valueif len(row_values) and row_values[0] and is_integer(row_values[0]):# logging.info(f"规则[{k}]第[{row}]行的前四列数据:{row_values[0]},{row_values[1]},{row_values[2]},{row_values[3]}")datas[int(row_values[0])] = row_valueslogging.info(f"规则[{k}],有[{len(datas)}]行数据")wb.close()return datasdef sync_excel_file(sync_path, datas):logging.info(f"sync_file begin: data_len: {len(datas):05}, {sync_path}")wb = app.books.open(sync_path)# 获取工作表中使用的区域sheet = wb.sheets[0]used_range = sheet.used_rangelast_row = used_range.last_cell.rowfor row in range(1, last_row + 1):# 获取当前行的前四列数据(A、B、C、D 列)row_values = sheet.range((row, 1), (row, 4)).valueif len(row_values) and row_values[0] and is_integer(row_values[0]):# logging.info(f"规则[{k}]第[{row}]行的前四列数据:{row_values[0]},{row_values[1]},{row_values[2]},{row_values[3]}")if int(row_values[0]) in datas:sheet.range((row, 2)).value = datas[int(row_values[0])][1]sheet.range((row, 3)).value = datas[int(row_values[0])][2]sheet.range((row, 4)).value = datas[int(row_values[0])][3]wb.save(sync_path)wb.close()logging.info(f"sync_file end: data_len: {len(datas):05}, {sync_path}")def make_file_writable(file_path):"""将指定的文件设置为可写(取消只读属性)。参数:file_path (str): 文件的路径。"""# 获取文件当前的权限属性file_attrs = os.stat(file_path).st_mode# 通过按位或操作,添加写权限(S_IWRITE),使其可写os.chmod(file_path, file_attrs | stat.S_IWRITE)record_format = '[%(asctime)s][%(levelname)s][%(filename)s][%(lineno)03d] %(message)s'
date_format = '%m%d %H:%M:%S'
handler = logging.handlers.ConcurrentRotatingFileHandler(os.path.join(os.getcwd(), 'app.log'), 'a', 20242880, 10)
log_handlers = [logging.StreamHandler(),handler
]
logging.basicConfig(level=logging.INFO,format=record_format,datefmt=date_format,handlers=log_handlers
)
logger = logging.getLogger(__name__)# 设置全局异常处理函数,确保未捕获的异常也能写入日志
def handle_exception(exc_type, exc_value, exc_traceback):if issubclass(exc_type, KeyboardInterrupt):# 对于键盘中断,保持默认处理sys.__excepthook__(exc_type, exc_value, exc_traceback)returnlogger.error("未捕获的异常:", exc_info=(exc_type, exc_value, exc_traceback))sys.excepthook = handle_exceptionif __name__ == '__main__':logger.info("work begin...")try:templates = get_template_files(r"D:\22\模版")all_files = list_all_files(r"D:\22\部门数据", templates)app = xw.App(visible=False)try:for k, v in all_files.items():all_datas = {}for v1 in v:datas = open_excel_file(k, v1, app)all_datas.update(datas)# break   # TODO: 临时 一遍, 稍后得删除此行代码sync_excel_file(templates[k], all_datas)# break   # TODO: 临时 一遍, 稍后得删除此行代码finally:app.quit()except Exception as e:# 捕获异常后记录详细异常信息logger.exception("执行过程中发生异常")raise  # 记录后重新抛出异常logger.info("work end.")input("按任意键退出...")

问题:

代码跑起来很成功,各方面数据都OK,只有一个问题,数据量稍微大一点的模板文件,比如15M,匹配并填写保存需要花费30分钟,几十个模板,1天也搞不赢,因此后面就在想,倒底是哪里写的有问题,是内存泄露还是什么引发这么慢?

经过人工智能的答疑,确认无论是读数据还是写数据,每一行均调用了com组件,而这个读写是很耗费时间的。

改进:

问题找到了,改起来就容易很多,一次性读写,其它均在内存中完成,性能提供近百倍,原先要1天都完不成的,只需要几十秒就完成了全部数据的汇集。附改好后的代码:

python">#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import xlwings as xw
import re
from typing import Dict
import logging
from concurrent_log_handler import ConcurrentRotatingFileHandler  # NOQA: F401
import statdef extract_numbers(line):"""提取开头的数字,并返回这个数字,如果没有提取到,则返回空字符串。参数:input_string (str): 字符串,每行以数字开头,后跟非数字字符。返回:rel: 提取到的数字"""rel = ""# 匹配每行开头连续的数字match = re.match(r'\s*(\d+)', line)if match:rel = match.group(1)return reldef get_template_files(input_dir) -> Dict[str, str]:files = {}try:# 遍历目录下所有文件for filename in os.listdir(input_dir):if filename.lower().endswith('.xlsx'):file_path = os.path.join(input_dir, filename)shu_zi = extract_numbers(filename)if shu_zi:if shu_zi == "0":continuefiles[shu_zi] = file_pathfinally:# 退出 Excel 应用passlogging.info("模板数:{}".format(len(files)))for k, v in files.items():logging.info("template_files:[{}]: {}".format(k, v))make_file_writable(v)logging.info("get_template_files end.\n\n")return filesdef list_all_files(directory, templates: Dict[str, str]):"""遍历指定目录下的所有文件,并返回包含文件完整路径的列表。参数:directory (str): 目标目录的路径。返回:list: 包含所有文件完整路径的列表。"""file_dic_list = {}logging.info("list_all_files begin...")def traverse(current_dir):"""递归遍历当前目录下的所有文件和子目录,将文件路径添加到 file_list 中。参数:current_dir (str): 当前遍历的目录路径。"""try:for entry in os.listdir(current_dir):full_path = os.path.join(current_dir, entry)if os.path.isdir(full_path):traverse(full_path)else:shu_zi = extract_numbers(entry)if shu_zi and shu_zi in templates:if shu_zi not in file_dic_list:file_dic_list[shu_zi] = []file_dic_list[shu_zi].append(full_path)except Exception as e:logging.error(f"访问目录 {current_dir} 时出错: {e}")traverse(directory)for k, v in file_dic_list.items():for v1 in v:logging.info("[{}]: {}".format(k, v1))logging.info("list_all_files end.\n\n")return file_dic_listdef is_integer(s):"""判断字符串是否为整型数字。参数:s (str): 待判断的字符串。返回:bool: 如果 s 表示整型数字,则返回 True;否则返回 False。"""try:# 尝试将字符串转换为整型int(s)return Trueexcept ValueError:return Falsedef open_excel_file_2(k, file_path, app):datas = {}logging.info(f"open_file: {file_path}")wb = app.books.open(file_path)sheet = wb.sheets[0]# 获取工作表中使用的区域used_range = sheet.used_rangelast_row = used_range.last_cell.rowfor row in range(1, last_row + 1):# 获取当前行的前四列数据(A、B、C、D 列)row_values = sheet.range((row, 1), (row, 4)).valueif len(row_values) and row_values[0] and is_integer(row_values[0]):# logging.info(f"规则[{k}]第[{row}]行的前四列数据:{row_values[0]},{row_values[1]},{row_values[2]},{row_values[3]}")datas[int(row_values[0])] = row_valueslogging.info(f"规则[{k}],有[{len(datas)}]行数据")wb.close()return datasdef open_excel_file(k, file_path, app):"""批量读取 Excel 文件中 A-D 列的数据,返回一个字典,key 为第一列数字,value 为整行数据。"""logging.info(f"open_file: {file_path}")wb = app.books.open(file_path)sheet = wb.sheets[0]used_range = sheet.used_rangelast_row = used_range.last_cell.row# 批量读取A-D区域data_range = sheet.range((1, 1), (last_row, 4)).valuedatas = {}# data_range 为列表形式,每个元素为一行数据for row in data_range:if row and len(row) >= 1 and row[0] and is_integer(row[0]):datas[int(row[0])] = rowlogging.info(f"规则[{k}],有[{len(datas)}]行数据")wb.close()return datasdef sync_excel_file(sync_path, datas, app):"""批量写入数据到 Excel 文件中,根据第一列匹配,如果存在则更新该行 B、C、D 三列的数据。"""logging.info(f"sync_file begin: data_len: {len(datas):05}, {sync_path}")wb = app.books.open(sync_path)sheet = wb.sheets[0]used_range = sheet.used_rangelast_row = used_range.last_cell.row# 批量读取A-D区域data_range = sheet.range((1, 1), (last_row, 4)).value# 遍历内存中的数据,根据第一列更新相应的行for idx, row in enumerate(data_range):if row and len(row) >= 4 and row[0] and is_integer(row[0]):key = int(row[0])if key in datas:# 更新B, C, D列数据row[1] = datas[key][1]row[2] = datas[key][2]row[3] = datas[key][3]# data_range[idx] 已更新# 批量写回更新后的数据区域sheet.range((1, 1), (last_row, 4)).value = data_rangewb.save(sync_path)wb.close()logging.info(f"sync_file end: data_len: {len(datas):05}, {sync_path}")def make_file_writable(file_path):"""将指定的文件设置为可写(取消只读属性)。参数:file_path (str): 文件的路径。"""# 获取文件当前的权限属性file_attrs = os.stat(file_path).st_mode# 通过按位或操作,添加写权限(S_IWRITE),使其可写os.chmod(file_path, file_attrs | stat.S_IWRITE)record_format = '[%(asctime)s][%(levelname)s][%(filename)s][%(lineno)03d] %(message)s'
date_format = '%m%d %H:%M:%S'
handler = logging.handlers.ConcurrentRotatingFileHandler(os.path.join(os.getcwd(), 'app.log'), 'a', 20242880, 10)
log_handlers = [logging.StreamHandler(),handler
]
logging.basicConfig(level=logging.INFO,format=record_format,datefmt=date_format,handlers=log_handlers
)
logger = logging.getLogger(__name__)# 设置全局异常处理函数,确保未捕获的异常也能写入日志
def handle_exception(exc_type, exc_value, exc_traceback):if issubclass(exc_type, KeyboardInterrupt):# 对于键盘中断,保持默认处理sys.__excepthook__(exc_type, exc_value, exc_traceback)returnlogger.error("未捕获的异常:", exc_info=(exc_type, exc_value, exc_traceback))sys.excepthook = handle_exceptionif __name__ == '__main__':logger.info("work begin...")try:templates = get_template_files(r"D:\22\模版")all_files = list_all_files(r"D:\22\部门数据", templates)app = xw.App(visible=False)try:for k, v in all_files.items():all_datas = {}for v1 in v:datas = open_excel_file(k, v1, app)all_datas.update(datas)# break   # TODO: 临时 一遍, 稍后得删除此行代码sync_excel_file(templates[k], all_datas, app)# break   # TODO: 临时 一遍, 稍后得删除此行代码finally:app.quit()except Exception as e:# 捕获异常后记录详细异常信息logger.exception("执行过程中发生异常")raise  # 记录后重新抛出异常logger.info("work end.")input("按任意键退出...")


http://www.ppmy.cn/embedded/176204.html

相关文章

品融电商:深耕兴趣电商,多维赋能品牌长效增长

品融电商:深耕兴趣电商,多维赋能品牌长效增长 在流量去中心化与消费需求多元化的新商业时代,品牌如何精准触达年轻用户、快速建立心智认知并实现销量突破?品融电商(PINKROON)凭借其“效品合一&#xff0c…

让 Deepseek 写一个计算器(网页)

完整代码 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>简单计算器</title><style&…

conda install 和 pip install 的区别

conda install 和 pip install 是两个常用的包安装命令&#xff0c;但它们在很多方面存在差异。 1. 所属管理系统不同 1.1 conda install conda install 是Anaconda和Miniconda发行版自带的包管理工具 conda 的安装命令。conda 是一个跨平台的开源包管理系统和环境管理系统&…

AI 生成 PPT 网站介绍与优缺点分析

随着人工智能技术不断发展&#xff0c;利用 AI 自动生成 PPT 已成为提高演示文稿制作效率的热门方式。本文将介绍几款主流的 AI PPT 工具&#xff0c;重点列出免费使用机会较多的网站&#xff0c;并对各平台的优缺点进行详细分析&#xff0c;帮助用户根据自身需求选择合适的工具…

(学习总结29)Linux 进程概念和进程状态

Linux 进程概念 冯诺依曼体系结构软件运行与存储分级数据流动的理论过程 操作系统操作系统(Operator System) 概念操作系统的功能与作用系统调用和库函数概念 进程概念描述进程 - PCBtask_struct查看进程通过系统调用获取进程标示符 PID通过系统调用 fork 函数创建进程简单使用…

软考教材重点内容 信息安全工程师 第20章 数据库系统安全

20.1.2 数据库安全威胁 数据库安全威胁如下: (1)授权的误用(Misuses of Authority)。合法用户越权获得他们不应该获得的资源&#xff0c;窃取程序或存储介质&#xff0c;修改或破坏数据。授权用户将自身的访问特权不适当地授予其他用户&#xff0c;导致系统安全策略受到威胁&a…

openEuler 基于 sealos 部署 k8s 1.30 集群

一、 sealos 简介 Sealos 是传说中最丝滑的k8s集群部署方式。 Sealos 是一款以 Kubernetes 为内核的云操作系统发行版。 Sealos 是一个基于 Kubeadm 的 K8s 一键安装工具&#xff0c;它可以帮助我们快速搭建一个 K8s 集群&#xff0c;而且还可以帮我们自动安装一些常用的插件…

《背影》再读:时光深处的温暖与感触

去苏州参加活动&#xff0c;按我的习惯&#xff0c;自然要带一本书的。所以朱自清的《背影》就这样不经意地跃入我的包中&#xff0c;里面收录了好多编散文&#xff0c;读起来总感觉吃力&#xff0c;有一种读鲁迅的书的感觉&#xff0c;于是想起了“背影”这文&#xff0c;重新…