You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用np.ix_实现Numpy并行索引优化重复K折交叉验证

如何用Numpy向量化/并行方式处理RepeatedKFold的所有折数据,避免低效循环?

我有一个大量使用Numpy的Python脚本,最近了解到Numpy的隐式并行特性,知道应该避免大型for循环,转而用隐式并行索引。但我的脚本里有个包含嵌套循环的大型循环,用来执行n次重复k折交叉验证:划分训练/测试集、处理数据集、拟合模型并重复操作。现在脚本运行极慢,当n和k较大时还会出现内存泄漏,推测是大型循环导致的。

我的通用代码框架是:

from sklearn.model_selection import RepeatedKFold
import ... # 导入数据、定义函数、预分配变量
number_conditions = 10 # 实际值更大
output = np.array(number_conditions, number_conditions) # 存储真实数据
outer_k = 5
outer_reps = 1 # 实际值更高
outer_rkf = RepeatedKFold(n_splits=outer_k, n_repeats=outer_reps)
array_for_indices = list(range(number_conditions))

低效的循环版本:

for train_indices, test_indices in outer_rkf.split(array_for_indices):
    ixgrid_train = np.ix_(train_indices, train_indices)
    output_train = output[ixgrid_train]
    ixgrid_test = np.ix_(test_indices, test_indices)
    output_test = output[ixgrid_test]
    ... # 后续大量数据处理及嵌套循环

我尝试了并行化代码,但最后一行报错IndexError: arrays used as indices must be of integer (or boolean) type

outer_split_1, outer_split_2, outer_split_3, outer_split_4, outer_split_5 = outer_rkf.split(array_for_indices)
index = np.array([outer_split_1, outer_split_2, outer_split_3, outer_split_4, outer_split_5])
all_train_inds = index[:, 0]
all_test_inds = index[:, 1]
ixgrid_train = np.ix_(all_train_inds, all_train_inds)
output_train = output[ixgrid_train]

我发现循环版本里的ixgrid_train是含两个整数数组的元组,但并行化版本里是含两个对象数组的元组(对象数组内嵌套数组)。我尝试调整index类型和索引方式,但没法一次性获取所有折的训练数据。核心困惑是不知道如何把循环维度转换为操作对象的维度来实现并行,尤其对元组的使用有疑问。请问如何一次性生成所有折的索引并提取对应数据,以便后续并行处理?


问题根源分析

你遇到的IndexError是因为all_train_inds对象数组(每个元素是长度不同的整数数组),而np.ix_需要的是整数类型的数组来生成网格索引,对象数组无法被Numpy正确解析,所以抛出错误。另外,交叉验证的每个折的训练/测试集大小可能不同(比如样本数不能被k整除时),这也导致没法直接把所有折的索引拼成一个统一形状的整数数组,这是Numpy向量化操作的一个限制。

下面给你几个针对性的解决方案,从简洁到高效,覆盖不同场景:


方案1:用列表推导式批量生成数据(简洁易读,避免嵌套循环)

先把所有拆分结果收集成列表,再用列表推导式一次性生成所有折的训练/测试数据,比手动嵌套循环更简洁,也方便后续并行处理:

from sklearn.model_selection import RepeatedKFold
import numpy as np

# 模拟你的数据(修正原代码的数组初始化错误:需要传shape参数)
number_conditions = 10
output = np.random.rand(number_conditions, number_conditions)
outer_k = 5
outer_reps = 1
outer_rkf = RepeatedKFold(n_splits=outer_k, n_repeats=outer_reps)
array_for_indices = list(range(number_conditions))

# 收集所有折的训练/测试索引
train_indices_list = []
test_indices_list = []
for train_inds, test_inds in outer_rkf.split(array_for_indices):
    train_indices_list.append(train_inds)
    test_indices_list.append(test_inds)

# 批量生成所有折的训练/测试数据
output_train_list = [output[np.ix_(train_inds, train_inds)] for train_inds in train_indices_list]
output_test_list = [output[np.ix_(test_inds, test_inds)] for test_inds in test_indices_list]

这个方法虽然用了列表推导式,但本质是把循环简化,而且每个折的数据可以独立处理,比嵌套循环高效很多。


方案2:用joblib实现任务并行(解决运行慢+内存泄漏问题)

因为交叉验证的每个折处理是完全独立的,属于“易并行”任务,用joblib可以直接利用多CPU核心加速,同时避免手动循环带来的内存泄漏(每个任务独立运行,结束后内存自动释放):

from joblib import Parallel, delayed

# 定义单个折的处理函数(把你的数据处理、模型拟合逻辑放这里)
def process_single_fold(train_inds, test_inds, output):
    # 提取当前折的训练/测试数据
    output_train = output[np.ix_(train_inds, train_inds)]
    output_test = output[np.ix_(test_inds, test_inds)]
    
    # 这里加入你的后续操作:比如数据预处理、模型训练、预测等
    # processed_train = your_preprocessing_func(output_train)
    # model = your_model.fit(processed_train)
    # test_score = model.predict(output_test)
    
    return output_train, output_test

# 并行处理所有折:n_jobs=-1表示用所有可用CPU核心
results = Parallel(n_jobs=-1)(
    delayed(process_single_fold)(train_inds, test_inds, output)
    for train_inds, test_inds in zip(train_indices_list, test_indices_list)
)

# 拆分结果为训练数据和测试数据的元组
all_output_train, all_output_test = zip(*results)

这种方式是解决你当前问题的最优解:既解决了运行慢的问题,又避免了内存泄漏,代码也容易维护。


方案3:Numpy完全向量化(仅适用于所有折样本数相同的场景)

如果你的number_conditions能被outer_k整除(每个折的训练/测试集长度完全相同),可以把索引拼成三维数组,用Numpy的隐式并行一次性提取所有数据:

# 假设number_conditions是outer_k的倍数(比如10/5=2,每个测试集有2个样本)
train_indices_array = np.array(train_indices_list)  # 形状:(总折数, 训练集大小)
test_indices_array = np.array(test_indices_list)    # 形状:(总折数, 测试集大小)

# 生成三维网格索引,提取所有折的训练数据
# 最终output_train_all形状为:(总折数, 训练集大小, 训练集大小)
output_train_all = output[train_indices_array[:, :, np.newaxis], train_indices_array[:, np.newaxis, :]]

# 同理提取测试数据
output_test_all = output[test_indices_array[:, :, np.newaxis], test_indices_array[:, np.newaxis, :]]

这种方式完全没有显式循环,利用Numpy的底层并行优化,效率最高,但只适用于所有折样本数一致的场景。


额外注意事项

  • 内存泄漏:手动嵌套循环容易因为中间变量堆积、引用未释放导致内存泄漏,用joblib的并行方式每个任务独立运行,结束后内存会被自动回收,能有效避免这个问题。
  • Numpy隐式并行的适用场景:Numpy的隐式并行是针对同形状数组的向量化操作,如果交叉验证的折形状不同,任务并行(joblib)是更合适的选择。

内容的提问来源于stack exchange,提问作者user13559693

火山引擎 最新活动