通过代码了解了解 Python 异步处理
- 0. 背景
- 1. 示例代码 1
- 2. 示例代码 2
0. 背景
想了解一下 Python 的异步编程,所以就学习了一些示例代码。
1. 示例代码 1
controllers.py
import asyncio
from datetime import datetimeasync def foo():print(f'{datetime.now()} Foo')async def bar():while True:print(f'{datetime.now()} Bar')await asyncio.sleep(1)
代码说明,
这是一个使用 asyncio 库实现的 Python 异步编程示例。asyncio 是 Python 3.4 引入的标准库,用于编写协程(coroutine)和异步 I/O 代码。在这个示例中,定义了两个异步函数 foo 和 bar。
foo 函数是一个简单的异步函数,每次被调用时会打印当前时间和字符串 “Foo”。
bar 函数是一个无限循环的异步函数,每秒钟打印当前时间和字符串 “Bar”,然后等待 1 秒钟。这里使用了 asyncio.sleep() 函数来等待指定的时间。
在异步编程中,可以使用协程来实现非阻塞的异步操作。协程是一种轻量级的线程,可以在单个线程中运行多个协程,从而实现并发操作。在这个示例中,使用了 asyncio 库提供的协程机制来实现异步操作。使用 asyncio.run() 函数来运行这个示例,它会自动创建一个事件循环,并将 foo 和 bar 函数注册到事件循环中。
main.py
import asyncio
from controllers import foo, bar
from apscheduler.schedulers.asyncio import AsyncIOSchedulerasync def main():# Init messageprint('\nPress Ctrl-C to quit at anytime!\n')scheduler = AsyncIOScheduler()scheduler.add_job(foo, "interval", seconds=2)scheduler.start()await asyncio.create_task(bar())if __name__ == "__main__":loop = asyncio.get_event_loop()loop.create_task(main())loop.run_forever()
代码说明,
这段代码使用了 asyncio 库和 apscheduler 库来实现一个定时任务调度器。具体来说,它定义了一个名为 main 的异步函数,它包含了以下步骤:
- 打印一条初始化消息,告诉用户如何退出程序。
- 创建一个 AsyncIOScheduler 对象,用于调度定时任务。
- 向调度器中添加一个名为 foo 的任务,每 2 秒钟运行一次。
- 启动调度器。
- 创建一个协程任务,调用名为 bar 的异步函数。
- 运行事件循环,等待协程任务完成。
如果运行的是这个文件,那么在最后一行的 if __name__ == "__main__"
分支中,会创建一个事件循环,并将 main 函数作为一个协程任务添加到事件循环中。然后,调用 loop.run_forever() 方法来运行事件循环,等待协程任务完成。这样,就可以实现定时任务调度器的功能。
运行效果,
Press Ctrl-C to quit at anytime!2023-05-31 14:47:30.461780 Bar
2023-05-31 14:47:31.468809 Bar
2023-05-31 14:47:32.471349 Bar
2023-05-31 14:47:32.472350 Foo
2023-05-31 14:47:33.484869 Bar
2023-05-31 14:47:34.463051 Bar
2023-05-31 14:47:34.463051 Foo
2023-05-31 14:47:35.478448 Bar
2023-05-31 14:47:36.475942 Bar
2023-05-31 14:47:36.475942 Foo
2023-05-31 14:47:37.487639 Bar
2023-05-31 14:47:38.484124 Bar
...
2. 示例代码 2
basic_generation.py
import asyncio
import edge_tts
import pygame# TEXT = "Hello World! Note that this code requires the requests and pygame modules to be installed. "
TEXT = "想了解一下 Python 的异步编程,所以就学习了一些示例代码。"
VOICE = "zh-CN-XiaoxiaoNeural"
OUTPUT_FILE = "./output/output.mp3"async def _main() -> None:communicate = edge_tts.Communicate(TEXT, VOICE)await communicate.save(OUTPUT_FILE)pygame.mixer.init()pygame.mixer.music.load("./output/output.mp3")pygame.mixer.music.play()# Wait for audio to finish playingwhile pygame.mixer.music.get_busy():continueif __name__ == "__main__":loop = asyncio.get_event_loop()try:loop.run_until_complete(_main())finally:loop.close()
代码说明,
这段代码使用了 Python 的异步编程模块 asyncio 和 Edge TTS API,实现了将指定的文本转换为语音并播放的功能。具体来说,代码的主要功能如下:
- 导入必要的模块和变量,包括 asyncio、edge_tts 和 pygame。
- 定义要转换为语音的文本(TEXT)、所使用的语音合成引擎(VOICE)和输出文件路径3.(OUTPUT_FILE)。
- 定义一个异步函数 _main(),该函数使用 Edge TTS API 将文本转换为语音,并使用 pygame 播放输出的音频文件。
- 在主函数中,获取异步事件循环,并运行 _main() 函数直到完成。
- 最后,关闭异步事件循环。
具体实现过程如下:
- 使用 edge_tts.Communicate() 函数创建一个 Communicate 对象,该对象包含了要转换为语音的文本和所使用的语音合成引擎。
- 使用 await communicate.save(OUTPUT_FILE) 异步保存输出的音频文件到指定路径。
- 初始化 pygame 模块,使用 pygame.mixer.music.load() 函数加载输出的音频文件。
- 使用 pygame.mixer.music.play() 函数播放加载的音频文件。
- 在 while 循环中,使用 pygame.mixer.music.get_busy() 函数检查音频是否正在播放,如果正在播放则继续循环,直到音频播放完成为止。
未完待续!