You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Dask并行化机器学习模型遇阻:并行失效与结果为空

嗨Christian,我来帮你排查下你遇到的两个核心问题——计算没并行、predicted_all为空,以及给出可行的解决方案:

一、predicted_all为空的关键原因

你在linearmodel函数里用了global predicted_all来合并结果,但在Dask多进程模式下,每个worker进程都有独立的内存空间,主进程的predicted_all和worker里的是完全隔离的副本。worker里对全局变量的修改根本不会同步回主进程,所以主进程的predicted_all始终是空的。

另外,你的代码里X_predict = []是个空列表,直接传给model.predict()会触发scikit-learn的维度错误,函数执行失败后也不会返回任何结果,这也会导致最终没有数据存入predicted_all

二、计算未并行的可能问题

  1. 类别列表错误:你定义的classes = ['ABC...Z']是单个字符串元素的列表,Dask只会遍历这一个元素,自然不会产生并行任务。正确的做法是从数据集中提取所有唯一类别:classes = df['class'].unique().tolist()
  2. 任务定义不规范:依赖全局变量的函数很难被Dask高效调度,纯函数(只靠输入参数、返回结果,不修改外部状态)才是Dask并行的最佳实践。

三、修正后的完整解决方案

我们把函数改成纯函数,让每个任务独立返回结果,最后在主进程合并所有输出:

1. 先准备测试用的预测输入

首先要生成符合特征维度的X_predict,比如让feature1按你需要的范围变化,其他特征用对应类别的均值填充:

def create_X_predict(df_oneClass, feature1_range):
    # 提取除target和feature1外的其他特征均值
    feature_means = df_oneClass.drop(['target', 'feature1'], axis=1).mean()
    # 构建测试数据集
    X_predict = pd.DataFrame({
        'feature1': feature1_range,
        **{col: [feature_means[col]]*len(feature1_range) for col in feature_means.index}
    })
    return X_predict

2. 重写线性回归函数

让函数接收类别、完整数据集和feature1范围作为参数,返回包含预测结果的DataFrame:

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error
from dask import delayed, compute

# 假设你的pandas df已经加载完成
classes = df['class'].unique().tolist()
# 定义feature1的变化范围(比如0到100,步长5)
feature1_range = np.arange(0, 101, 5)

def linearmodel(class_label, full_df, feature1_range):
    # 筛选当前类别的数据
    df_oneClass = full_df[full_df['class'] == class_label].drop(['class'], axis=1)
    df_y = df_oneClass['target']
    df_X = df_oneClass.drop(['target'], axis=1)
    
    # 训练模型
    model = LinearRegression()
    model.fit(df_X, df_y)
    
    # 生成测试数据并预测
    X_predict = create_X_predict(df_oneClass, feature1_range)
    y_predict = model.predict(X_predict)
    
    # 计算训练集MAE(可选)
    y_train_pred = model.predict(df_X)
    mae = mean_absolute_error(df_y, y_train_pred)
    
    # 返回带类别标识的结果DataFrame
    return pd.DataFrame({
        'class': [class_label]*len(y_predict),
        'feature1': feature1_range,
        'predicted_target': y_predict,
        'mae': [mae]*len(y_predict)
    })

3. 用Dask并行执行并合并结果

# 创建延迟任务列表
delayed_tasks = [delayed(linearmodel)(cls, df, feature1_range) for cls in classes]

# 用多进程调度器并行计算(对应你的4节点集群)
results = compute(*delayed_tasks, scheduler='processes')

# 合并所有结果到predicted_all
predicted_all = pd.concat(results, ignore_index=True)

额外优化:适配4节点集群的高效方案

如果你的数据集很大,建议把pandas DataFrame转换成Dask DataFrame,利用groupby.apply自动实现分布式并行:

import dask.dataframe as dd

# 转换为Dask DataFrame,分区数匹配你的集群节点数
dask_df = dd.from_pandas(df, npartitions=4)

def dask_linearmodel(df_group):
    # 这里df_group是每个class对应的pandas子数据集
    df_y = df_group['target']
    df_X = df_group.drop(['class', 'target'], axis=1)
    model = LinearRegression()
    model.fit(df_X, df_y)
    
    X_predict = create_X_predict(df_group, feature1_range)
    y_predict = model.predict(X_predict)
    mae = mean_absolute_error(df_y, model.predict(df_X))
    
    return pd.DataFrame({
        'class': [df_group['class'].iloc[0]]*len(y_predict),
        'feature1': feature1_range,
        'predicted_target': y_predict,
        'mae': [mae]*len(y_predict)
    })

# 分布式分组计算,指定结果的元数据格式
predicted_all = dask_df.groupby('class').apply(
    dask_linearmodel,
    meta={
        'class': 'object',
        'feature1': 'float64',
        'predicted_target': 'float64',
        'mae': 'float64'
    }
).compute()

这样就能真正发挥4节点集群的并行计算能力啦!

内容的提问来源于stack exchange,提问作者Christian Ivaha

火山引擎 最新活动