如何使用ProcessPoolExecutor通过多进程加速Python代码并等待子进程全部完成
使用ProcessPoolExecutor加速Python代码的实现方案
没问题,我来帮你把这段代码改成用ProcessPoolExecutor加速的版本,刚好匹配你的需求——把第一个循环的每一次迭代放到子进程里跑,并且等所有子进程都做完再继续后续流程。
核心思路
我们需要把单个资源文件的处理逻辑封装成一个独立函数,然后用进程池来并行执行这个函数,同时确保主进程会等待所有子进程完成后再继续。
完整代码示例
首先导入需要的模块,然后重构你的代码:
from concurrent.futures import ProcessPoolExecutor import yaml from pathlib import Path # 把单个资源文件的处理逻辑封装成独立函数 def process_resource_file(resource_file): try: with resource_file.open() as f: ns_resources = yaml.safe_load(f) # 加个默认空列表,避免没有'items'键时抛出KeyError resources = ns_resources.get('items', []) for resource in resources: # ----------------- 慢操作 ----------------- # 把你的慢操作代码放在这里,比如资源解析、计算等逻辑 # 示例:打印资源名称(根据你的实际需求替换) print(f"Processing resource {resource.get('metadata', {}).get('name')} from {resource_file.name}") # 可选:返回处理结果,方便后续统计 return f"✅ 处理完成: {resource_file.name}" except Exception as e: # 捕获异常,避免单个文件失败导致整个进程池挂掉 return f"❌ 处理失败 {resource_file.name}: {str(e)}" if __name__ == '__main__': # 替换成你的实际资源文件夹路径 tmp_resource_folder = Path("./your_resource_dir") # 创建进程池:默认使用CPU核心数作为进程数,也可以手动指定max_workers=4 with ProcessPoolExecutor() as executor: # 批量提交任务,executor.map会自动等待所有子进程完成 processing_results = executor.map(process_resource_file, tmp_resource_folder.iterdir()) # 所有子进程完成后,才会执行这里的代码 print("\n=== 所有资源文件处理完成 ===") # 可选:遍历打印处理结果 for result in processing_results: print(result) # 后续流程代码写在这里 print("\n开始执行后续流程...")
关键注意事项
必须加
if __name__ == '__main__':
这是Python多进程的规范要求(尤其是Windows系统),避免进程启动时重复导入模块导致的无限递归问题,Linux/macOS也建议加上,保证代码可移植性。函数的可序列化性
ProcessPoolExecutor用pickle来传递参数和返回值,所以process_resource_file函数不能引用无法序列化的对象(比如打开的文件句柄、复杂的自定义类实例等)。我们在函数内部打开文件的方式是安全的,因为每个子进程会独立打开自己的文件句柄。进程数的选择
默认的max_workers是你的CPU核心数,这个值比较合理——太多进程会导致CPU上下文切换频繁,反而降低效率;太少则无法充分利用多核资源。如果你的慢操作是IO密集型(比如读写文件、调用API),可以适当调高max_workers,但一般不超过核心数的2倍。异常处理
建议在封装的函数内部加上异常捕获,这样单个文件处理失败不会影响其他子进程的执行,还能收集错误信息方便排查。
内容的提问来源于stack exchange,提问作者Kristian




