You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用ProcessPoolExecutor存储进程结果并实现构建失败判定的技术问题

解决concurrent.futures并行结果收集与失败标记问题

嘿,我来帮你搞定这个并行处理结果收集的问题!你的代码核心问题出在没有从子进程返回状态码,以及错误的结果获取方式阻塞了并行执行,下面一步步给你修正:

核心问题分析

  1. slave函数无返回值:你现在的slave里执行了各种命令,但既没捕获os.system的退出码,也没有return任何值,所以每个进程结束后默认返回None,这就是exit_status全是None的原因。
  2. 立即调用result()阻塞并行:循环里submit后立刻调用ret.result()会让程序等待当前进程完成才提交下一个,完全浪费了并行的优势,应该先收集所有任务,再统一获取结果。
  3. 缺乏异常处理os.makedirsshutil.copy这些操作可能抛出异常(比如权限不足、文件不存在),如果不处理,会导致程序直接崩溃,无法收集其他进程的结果。

修正后的实现方案

步骤1:修改slave函数,返回状态码

我们需要捕获每个操作的错误,一旦任何步骤失败就返回非0状态码;所有操作成功则返回0。注意os.system的返回值是包含退出码的整数,需要右移8位(>>8)才能拿到真实的命令退出码。

步骤2:正确收集并行任务结果

先把所有submit返回的Future对象存到列表里,等所有任务都提交后,再遍历获取结果,这样才能真正并行执行。

步骤3:检查结果,标记构建失败

遍历收集到的状态码,只要有一个非0,就用sys.exit(1)标记构建失败。

完整修正代码

#!/usr/bin/env python3
import concurrent.futures
import sys
import shutil
import os
import json
from os import path
import multiprocessing as mp

def slave(path1, path2, target):
    try:
        # 创建目标目录,exist_ok=True避免已存在时报错
        os.makedirs(target, exist_ok=True)
        # 复制文件,捕获拷贝错误
        shutil.copy(path1, target)
        shutil.copy(path2, target)
        
        # 执行登录命令,检查退出码
        login_ret = os.system("<Login command>") >> 8
        if login_ret != 0:
            print(f"登录命令失败,退出码:{login_ret}")
            return login_ret
        
        # 执行镜像创建命令
        img_ret = os.system("<Image creation command>") >> 8
        if img_ret != 0:
            print(f"镜像创建命令失败,退出码:{img_ret}")
            return img_ret
        
        # 执行拷贝到其他节点命令
        copy_ret = os.system("<Copy to Other slaves or NFS>") >> 8
        if copy_ret != 0:
            print(f"拷贝命令失败,退出码:{copy_ret}")
            return copy_ret
        
        # 所有操作成功,返回0
        return 0
    except Exception as e:
        # 捕获文件操作等异常,返回非0状态码
        print(f"Slave进程执行异常:{str(e)}")
        return 1

def main():
    processed = {}
    exit_status = []
    futures = []  # 用来存所有Future对象
    
    with open('example.json', 'r') as f:
        data = json.load(f)
    
    # 解析JSON数据,整理targz和yaml对应关系
    for value in data.values():  # 这里用data.values()更简洁,之前的value[1]其实就是values()
        targz = None
        yaml = None
        for line in value:
            if line.endswith('.zip'):
                targz = line
            elif line.endswith('.yaml'):
                yaml = line
        if targz and yaml:  # 确保两个文件都找到
            processed[targz] = yaml
    
    # 提交所有并行任务,不立即获取结果
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for id, (path2, path1) in enumerate(processed.items(), 1):
            target = path.join("/tmp", f"dir{id}")
            future = executor.submit(slave, path1, path2, target)
            futures.append(future)
        
        # 遍历所有Future,获取结果
        for future in concurrent.futures.as_completed(futures):
            try:
                ret = future.result()
                exit_status.append(ret)
            except Exception as e:
                # 捕获可能的进程启动异常
                print(f"任务执行异常:{str(e)}")
                exit_status.append(1)
    
    # 打印所有结果
    for idx, status in enumerate(exit_status, 1):
        print(f"##########Result status of task {idx}: {status}")
    
    # 检查是否有失败的任务,标记构建失败
    if any(status != 0 for status in exit_status):
        print("构建失败:存在执行失败的任务")
        sys.exit(1)
    else:
        print("所有任务执行成功")
        sys.exit(0)

if __name__ == "__main__":
    mp.set_start_method('spawn')
    main()

关键改进点说明

  • os.makedirs添加exist_ok=True,避免目录已存在时抛出异常;
  • 每个os.system命令都捕获退出码,一旦失败立即返回;
  • futures列表存所有任务,通过as_completed获取结果,保证并行执行;
  • 捕获所有可能的异常,确保程序不会中途崩溃;
  • 最后通过any()检查结果列表,只要有非0值就退出并标记失败。

内容的提问来源于stack exchange,提问作者voltas

火山引擎 最新活动