Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

news/2024/10/25 8:41:02/

Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

    • 1. 异步编程基础
    • 2. 异步文件批量写入示例
      • 2.1 代码结构
      • 2.2 代码实现
      • 2.3 代码解释
        • 2.3.1 `BatchWriter`类
        • 2.3.2 `main`函数
    • 3. 其他示例代码
      • 3.1 简单的异步文件写入
      • 3.2 异步文件读取
      • 3.3 使用`asyncio.Lock`保护共享资源
      • 3.4 使用`asyncio.wait_for`设置超时
    • 4. 总结

在现代编程中,异步编程已经成为提高程序性能的重要手段之一。特别是在处理I/O密集型任务时,异步编程可以显著提高程序的效率。Pythonasyncio库和第三方库aiofiles为我们提供了强大的工具,使得异步文件操作变得简单而高效。

本文将通过一个具体的示例,介绍如何使用asyncioaiofiles进行异步文件批量写入,并详细解释其中的关键技术点。

1. 异步编程基础

asyncioawait_8">1.1 asyncioawait

asyncioPython的标准库,用于编写异步代码。async关键字用于定义一个异步函数,而await关键字用于等待一个协程(coroutine)完成。

import asyncioasync def hello_world():print("Hello")await asyncio.sleep(1)print("World")asyncio.run(hello_world())

aiofiles_23">1.2 aiofiles

aiofiles是一个第三方库,提供了异步文件操作的功能。通过aiofiles.open可以异步打开文件,并通过await f.write进行异步写入。

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

2. 异步文件批量写入示例

2.1 代码结构

我们将实现一个BatchWriter类,用于批量写入数据到文件中。该类的主要功能包括:

  • 将数据添加到缓冲区。
  • 当缓冲区达到一定大小时,将缓冲区中的数据批量写入文件。
  • 使用asyncio.Lock保护共享资源,避免竞态条件。
  • 使用asyncio.wait_for设置写入操作的超时时间。

2.2 代码实现

import asyncio
import json
import time
from collections import deque
import aiofilesclass BatchWriter:def __init__(self, filename: str, batch_size: int = 100):self.filename = filenameself.batch_size = batch_sizeself.buffer = deque()self.lock = asyncio.Lock()async def add(self, data: dict):print(f"Adding data: {data}")async with self.lock:print("Lock acquired in add")self.buffer.append(data)print(f"Buffer size after adding: {len(self.buffer)}")should_flush = len(self.buffer) >= self.batch_sizeprint("Releasing lock in add")if should_flush:print("Buffer size exceeded batch_size, calling flush")await self.flush()async def flush(self):print("Starting flush")buffer_to_write = []async with self.lock:if not self.buffer:print("Buffer is empty, exiting flush")return# 将缓冲区内容复制到临时列表并清空缓冲区buffer_to_write = list(self.buffer)self.buffer.clear()print(f"Flushing buffer of size: {len(buffer_to_write)}")# 在锁外执行文件写入if buffer_to_write:try:print(f"Writing {len(buffer_to_write)} items to file")await asyncio.wait_for(self._write_to_file(buffer_to_write), timeout=10)print("Finished writing to file")except asyncio.TimeoutError:print("Timeout while writing to file")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))except Exception as e:print(f"Error writing to file: {e}")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))print("Flush complete")async def _write_to_file(self, buffer_to_write):async with aiofiles.open(self.filename, 'a') as f:for data in buffer_to_write:await f.write(json.dumps(data, ensure_ascii=False) + '\n')async def main():start_time = time.time()writer = BatchWriter(filename="output.jsonl", batch_size=5)async def generate_data():for i in range(10):data = {"id": i, "value": f"data_{i}"}print(f"Generating data: {data}")await writer.add(data)await asyncio.sleep(0.1)print("Starting data generation")await generate_data()print("Finished data generation")print("Calling final flush")await writer.flush()  # 最终刷新缓冲区end_time = time.time()print(f"Total time: {end_time - start_time} seconds")if __name__ == "__main__":asyncio.run(main())

2.3 代码解释

2.3.1 BatchWriter
  • __init__方法:初始化文件名、批量大小、缓冲区和锁。
  • add方法:将数据添加到缓冲区,并在缓冲区达到批量大小时调用flush方法。
  • flush方法:将缓冲区中的数据批量写入文件。如果写入失败(如超时),将数据重新放回缓冲区。
  • _write_to_file方法:异步写入数据到文件。
2.3.2 main函数
  • generate_data协程:生成数据并调用add方法将数据添加到缓冲区。
  • main函数:启动数据生成,并在数据生成完成后调用flush方法进行最终的缓冲区刷新。

3. 其他示例代码

3.1 简单的异步文件写入

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

3.2 异步文件读取

import asyncio
import aiofilesasync def read_from_file(filename):async with aiofiles.open(filename, 'r') as f:content = await f.read()return contentasync def main():content = await read_from_file('example.txt')print(f"File content: {content}")asyncio.run(main())

asyncioLock_197">3.3 使用asyncio.Lock保护共享资源

import asyncio
import aiofilesclass FileWriter:def __init__(self, filename):self.filename = filenameself.lock = asyncio.Lock()async def write(self, content):async with self.lock:async with aiofiles.open(self.filename, 'a') as f:await f.write(content + '\n')async def main():writer = FileWriter('example.txt')async def write_data(data):await writer.write(data)await asyncio.gather(write_data('Data 1'),write_data('Data 2'),write_data('Data 3'))print("All data written to file")asyncio.run(main())

asynciowait_for_230">3.4 使用asyncio.wait_for设置超时

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():try:await asyncio.wait_for(write_to_file('example.txt', 'Hello, World!'), timeout=1)print("Data written to file")except asyncio.TimeoutError:print("Operation timed out")asyncio.run(main())

4. 总结

通过本文的介绍和示例代码,我们了解了如何使用asyncioaiofiles进行异步文件操作。异步编程可以显著提高I/O密集型任务的效率,特别是在处理大量并发任务时。使用asyncio.Lock可以保护共享资源,避免竞态条件。使用asyncio.wait_for可以设置操作的超时时间,防止无限期等待。


http://www.ppmy.cn/news/1541796.html

相关文章

构建effet.js人脸识别交互系统的实战之路

文章目录 前言一、什么是effet.js二、为什么需要使用effet.js四、effet.js能做什么五、使用步骤1.引入库2.main.js中注册全局2.使用3.效果图 六、其他模式讲解人脸打卡人脸添加睡眠检测 在h5中的使用总结 前言 在当今数字化的时代,用户体验变得尤为重要&#xff0c…

软考:缓存分片和一致性哈希

缓存分片技术是一种将数据分散存储在多个节点上的方法,它在分布式缓存系统中尤为重要。这项技术的核心目的是提高系统的性能和可扩展性,同时确保数据的高可用性。以下是缓存分片技术的一些关键点: 数据分片:缓存分片涉及将数据分成…

多个版本的GCC(GNU编译器集合)可以同时安装并存

在Ubuntu系统中,多个版本的GCC(GNU编译器集合)可以同时安装并存。GCC是编译C、C以及其他编程语言程序的重要工具,不同的项目可能需要不同版本的GCC来确保兼容性。 为什么需要多个GCC版本 项目依赖:不同的软件项目可能…

Python RabbitMQ 消息队列监听

Python RabbitMQ 消息队列监听 # coding: utf-8 # 测试消息消费import datetime import logging as log import os from pathlib import Path from typing import Listimport pika# 设置日志格式 Path("./logs").mkdir(parentsTrue, exist_okTrue) os.chdir("./…

74页PPT智能工厂整体规划方案

▲关注智慧方案文库,学习9000多份最新解决方案,其中 PPT、WORD超过7000多份 ,覆盖智慧城市多数领域的深度知识社区,稳定更新4年,日积月累,更懂行业需求。 智能工厂的定义 根据《智能工厂通用技术要求》的…

AIGC智能提示词项目实践(1):深入MySQL高级语法,提升开发效率

AIGC智能提示词项目实践-1:深入MySQL高级语法,提升开发效率 1.读取数据表中的字段进行脱敏(*加密)2.自动获取对应的数据表和字段3.表单有数据才进行更新的条件语句(构成数组)4.动态更新字段且进行条件判断5.动态更新数据表和字段6.字段自身1的操作7.多关…

【纯血鸿蒙】专项测试工具 DevEco Testing

DevEco Testing 为生态合作伙伴接入 HarmonyOS 生态提供专业的测试服务,共筑高品质的智能硬件产品。 云端服务平台面向开发者提供724 小时的远程多终端真机实验室,提供华为专业的应用安全隐私检测,提供基于华为真机的应用自动化测试。 访问地址:https://devecostudio.huawe…

阅读Go源码的顿悟时刻

Mattermost 的 Jess Espino 向 Natalie 讲述了他在阅读 Go 源代码时遇到的 10 个“顿悟时刻”(前六个)。第二部分(其余的顿悟时刻)即将推出! 本篇内容是根据2021年5月份#323 Aha moments reading Go’s source: Part …