一、简介
concurrent.futures是重要的异步编程库。内部实现机制非常复杂,简单来说就是开辟一个固定大小为n的进程池/线程池。进程池中最多执行n个进程/线程,当任务完成后,从任务队列中取新任务,若池满,则排队等待
concurrent.futures主要实现了进程池和线程池,适合做派生一堆任务,异步执行完成后,再收集这些任务,且保持相同的api,池的引入带来了一定好处:
开发更快,代码简洁,调调函数
进程线程复用,减去了大量开辟,删除进程线程时的开销
有效避免了因为创建进程线程过多,而导致负荷过大的问题
二、进程、线程通用方法
- wait(fs, timeout=None, return_when=ALL_COMPLETED),等待线程或进程执行完,再执行主线程,像当于join
- fs: 表示需要执行的序列
- timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
- return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 等待全部子线程执行完
''' FIRST_COMPLETED = 'FIRST_COMPLETED' 等待第一个子线程执行完成 FIRST_EXCEPTION = 'FIRST_EXCEPTION' 第一个子线程异常 ALL_COMPLETED = 'ALL_COMPLETED' 等待全部子线程执行完 '''from concurrent.futures import wait,ALL_COMPLETED,FIRST_COMPLETEDwait([t1,t2],return_when=ALL_COMPLETED)
- as_completed(fs, timeout=None) 获取每一个线程或进程的返回结果,返回结果顺应为线程执行完毕的顺序
- fs: 表示需要执行的序列
- timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
from concurrent.futures import as_completedfor future in as_completed([t1,t2]):print(future.result())
- Future() 创建Future对象,与协程中的asyncio.Future对象没有任何关系,submit返回的就是该对象
- future.set_result(result):设置future返回值
- future.result():获取future返回值
- future.done():future是否执行完毕
- future.cancelled():future是否取消
- future.add_done_callback(futureCallback):执行完毕的回调函数
- future.remove_done_callback(futureCallback):移除执行完毕的回调函数
- future.cancel():取消future
- future.get_loop():返回future所绑定的事件循环
- future.set_exception(exception):设置future异常
- future.exception():获取异常
from concurrent.futures import Futurefuture = Future()future.set_result('future') print(future.result())
三、线程池
import time
from concurrent.futures import ThreadPoolExecutordef thread1(text):print('thread1')time.sleep(3)print(text)return f'我是{text}'def thread2(text):print('thread2')time.sleep(5)print(text)return f'我是{text}'# 创建一个最大容纳数量为2的线程池
pool= ThreadPoolExecutor(max_workers=2)# submit提交执行的函数到线程池中
t1 = pool.submit(thread1, 'thread1_sleep')
t2 = pool.submit(thread2, 'thread2_sleep')#result来获取返回值
print(t1.result())
print(t2.result())print('主线程结束')# 线程池关闭
pool.shutdown()//优化
with ThreadPoolExecutor(max_workers=2) as pool:task1 = pool.submit(thread1, 'thread1_sleep')task2 = pool.submit(thread2, 'thread2_sleep')print(task1.result())print(task2.result())print('主线程结束')
- pool.submit(fn) 提交一个任务到线程池,fn:线程执行的函数,返回Future对象
- pool.map(fn, *iterables, timeout=None),map 方法是对序列中每一个元素都执行任务,不需要执行submit;返回顺序为线程执行顺序,fn: 程执行的函数、timeout:等待的最大时间,如果超过这个时间即使线程未执行完成也将返回、iterables二个参数接受一个可迭代对象
import time from concurrent.futures import ThreadPoolExecutordef thread1(text):time.sleep(3)print(text)return f'我是{text}'pool= ThreadPoolExecutor(max_workers=2)future_list = ['thread1_sleep','thread2_sleep']map_future = pool.map(thread1,future_list)for future in map_future:print(future)print('主线程结束') pool.shutdown()
- pool.shutdown() 线程池关闭
四、进程池
import time
from concurrent.futures import ProcessPoolExecutor,as_completeddef task(text):print(f'{text}开始运行')time.sleep(3)print(f'{text}运行结束')return f'我是{text}'if __name__ == '__main__':pool = ProcessPoolExecutor(max_workers=3)task1 = pool.submit(task,'tak1')task2 = pool.submit(task,'tak2')print(task1.result())print(task2.result())for future in as_completed([task1, task2]):print('as_completed:',future.result())task_list = ['tal3','tak4']task_map = pool.map(task,task_list)for task_item in task_map:print(task_item)pool.shutdown()
- pool.submit(fn) 提交一个任务到进程池,fn 进程执行的函数,返回Future对象
- pool.map(fn, *iterables, timeout=None),map 方法是对序列中每一个元素都执行任务,不需要执行submit;返回顺序为进程执行顺序,fn: 进程执行的函数、timeout: 等待的最大时间,如果超过这个时间即使进程未执行完成也将返回、iterables:第二个参数接受一个可迭代对象
- pool.shutdown() 关闭进程池并销毁资源