Python multiprocessing pool.starmap中添加异常处理以实现遇错跳过并继续执行的方法
Python multiprocessing pool.starmap中添加异常处理以实现遇错跳过并继续执行的方法
你的思路方向完全正确,只需要稍微完善包装函数的逻辑,让它在捕获异常时返回一个明确的占位值(比如NaN),而不是直接pass,这样pool.starmap就能正常收集所有结果,不会因为单个任务出错而终止整个程序。
方法一:针对mpcalc.ccl的专用包装函数
你写的run_ccl函数缺少返回值处理,修改后可以这样实现:
import numpy as np import metpy.calc as mpcalc def run_ccl(p, t, td): try: # 正常执行CCL计算并返回结果 return mpcalc.ccl(p, t, td) except Exception as e: # 打印错误信息,方便后续排查出问题的参数(可选但推荐) print(f"计算CCL时遇到异常: {str(e)},参数:p={p}, t={t}, td={td}") # 返回NaN作为出错标记,后续可以识别并处理这些异常结果 return np.nan
然后直接用这个包装函数调用starmap即可:
from multiprocessing import Pool # 假设你的ccl_argument_list已经准备完成 with Pool() as pool: ccl_pooled = pool.starmap(run_ccl, ccl_argument_list)
这样每个任务都会独立执行,哪怕某个参数组合触发罕见异常,也只会返回NaN,其他任务不受影响,程序会继续处理完所有数千个数据点。
方法二:通用包装器(适用于任意函数)
如果你以后还需要给其他MetPy函数做类似的异常处理,可以写一个通用的包装函数,避免重复编写相似代码:
import numpy as np def safe_execute(func, *args): try: return func(*args) except Exception as e: print(f"执行函数{func.__name__}时出错: {str(e)},参数:{args}") return np.nan
使用时需要调整你的参数列表,把目标函数作为每个参数元组的第一个元素:
# 转换原参数列表,每个元组前加上mpcalc.ccl ccl_args_with_func = [(mpcalc.ccl, p, t, td) for p, t, td in ccl_argument_list] with Pool() as pool: ccl_pooled = pool.starmap(safe_execute, ccl_args_with_func)
关键说明
- 捕获
Exception是为了覆盖所有可能的罕见异常(你提到无法复现具体异常类型),如果之后明确了异常类型,可以改成捕获特定异常(比如IndexError),这样更严谨。 - 返回
NaN是因为它是数值计算中常用的异常标记,后续你可以用numpy的函数(比如np.isnan)快速筛选出异常结果,进行二次处理。 - 使用
with Pool()上下文管理器可以自动管理进程池的资源,避免手动关闭的麻烦。
备注:内容来源于stack exchange,提问作者user8229029




