使用ProcessPoolExecutor存储进程结果并实现构建失败判定的技术问题
解决concurrent.futures并行结果收集与失败标记问题
嘿,我来帮你搞定这个并行处理结果收集的问题!你的代码核心问题出在没有从子进程返回状态码,以及错误的结果获取方式阻塞了并行执行,下面一步步给你修正:
核心问题分析
slave函数无返回值:你现在的slave里执行了各种命令,但既没捕获os.system的退出码,也没有return任何值,所以每个进程结束后默认返回None,这就是exit_status全是None的原因。- 立即调用
result()阻塞并行:循环里submit后立刻调用ret.result()会让程序等待当前进程完成才提交下一个,完全浪费了并行的优势,应该先收集所有任务,再统一获取结果。 - 缺乏异常处理:
os.makedirs、shutil.copy这些操作可能抛出异常(比如权限不足、文件不存在),如果不处理,会导致程序直接崩溃,无法收集其他进程的结果。
修正后的实现方案
步骤1:修改slave函数,返回状态码
我们需要捕获每个操作的错误,一旦任何步骤失败就返回非0状态码;所有操作成功则返回0。注意os.system的返回值是包含退出码的整数,需要右移8位(>>8)才能拿到真实的命令退出码。
步骤2:正确收集并行任务结果
先把所有submit返回的Future对象存到列表里,等所有任务都提交后,再遍历获取结果,这样才能真正并行执行。
步骤3:检查结果,标记构建失败
遍历收集到的状态码,只要有一个非0,就用sys.exit(1)标记构建失败。
完整修正代码
#!/usr/bin/env python3 import concurrent.futures import sys import shutil import os import json from os import path import multiprocessing as mp def slave(path1, path2, target): try: # 创建目标目录,exist_ok=True避免已存在时报错 os.makedirs(target, exist_ok=True) # 复制文件,捕获拷贝错误 shutil.copy(path1, target) shutil.copy(path2, target) # 执行登录命令,检查退出码 login_ret = os.system("<Login command>") >> 8 if login_ret != 0: print(f"登录命令失败,退出码:{login_ret}") return login_ret # 执行镜像创建命令 img_ret = os.system("<Image creation command>") >> 8 if img_ret != 0: print(f"镜像创建命令失败,退出码:{img_ret}") return img_ret # 执行拷贝到其他节点命令 copy_ret = os.system("<Copy to Other slaves or NFS>") >> 8 if copy_ret != 0: print(f"拷贝命令失败,退出码:{copy_ret}") return copy_ret # 所有操作成功,返回0 return 0 except Exception as e: # 捕获文件操作等异常,返回非0状态码 print(f"Slave进程执行异常:{str(e)}") return 1 def main(): processed = {} exit_status = [] futures = [] # 用来存所有Future对象 with open('example.json', 'r') as f: data = json.load(f) # 解析JSON数据,整理targz和yaml对应关系 for value in data.values(): # 这里用data.values()更简洁,之前的value[1]其实就是values() targz = None yaml = None for line in value: if line.endswith('.zip'): targz = line elif line.endswith('.yaml'): yaml = line if targz and yaml: # 确保两个文件都找到 processed[targz] = yaml # 提交所有并行任务,不立即获取结果 with concurrent.futures.ProcessPoolExecutor() as executor: for id, (path2, path1) in enumerate(processed.items(), 1): target = path.join("/tmp", f"dir{id}") future = executor.submit(slave, path1, path2, target) futures.append(future) # 遍历所有Future,获取结果 for future in concurrent.futures.as_completed(futures): try: ret = future.result() exit_status.append(ret) except Exception as e: # 捕获可能的进程启动异常 print(f"任务执行异常:{str(e)}") exit_status.append(1) # 打印所有结果 for idx, status in enumerate(exit_status, 1): print(f"##########Result status of task {idx}: {status}") # 检查是否有失败的任务,标记构建失败 if any(status != 0 for status in exit_status): print("构建失败:存在执行失败的任务") sys.exit(1) else: print("所有任务执行成功") sys.exit(0) if __name__ == "__main__": mp.set_start_method('spawn') main()
关键改进点说明
os.makedirs添加exist_ok=True,避免目录已存在时抛出异常;- 每个
os.system命令都捕获退出码,一旦失败立即返回; - 用
futures列表存所有任务,通过as_completed获取结果,保证并行执行; - 捕获所有可能的异常,确保程序不会中途崩溃;
- 最后通过
any()检查结果列表,只要有非0值就退出并标记失败。
内容的提问来源于stack exchange,提问作者voltas




