You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Databricks中使用Multiprocessing Pool加速百万行Pandas DataFrame处理并解决PicklingError问题

解决Pandas多进程处理时的PicklingError问题

错误原因分析

你遇到的PicklingError是因为在交互式环境(比如Jupyter Notebook/Lab)中定义的函数,无法被multiprocessing的默认序列化机制(pickle)正确处理。当使用Pool时,子进程需要导入主模块中的函数,但交互式环境里的函数定义不会被子进程正确识别,导致属性查找失败。

解决方案

方案1:使用spawn启动上下文(适合Jupyter环境)

修改你的并行函数,使用multiprocessing.get_context('spawn')创建进程池,这种启动方式会重新初始化子进程的Python解释器,能正确识别函数定义:

import numpy as np
import pandas as pd
from multiprocessing import get_context

# 先确保你的辅助函数已正确定义
def ISColFilled(val):
    return 1 if pd.notna(val) else 0

def IsBillingValid(stage_name, det_val):
    # 替换成你的实际业务逻辑
    return 0 if (stage_name == "Closed Won" and det_val == 0) else 1

def OpportunityDetector(df_Opportunity):
    df_Opportunity['BillingCity_Det'] = df_Opportunity['BillingCity'].apply(ISColFilled)
    df_Opportunity['BillingPostalCode_Det'] = df_Opportunity['BillingPostalCode'].apply(ISColFilled)
    df_Opportunity['BillingStreet_Det'] = df_Opportunity['BillingStreet'].apply(ISColFilled)
    
    df_Opportunity['BillingCity_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingCity_Det), axis=1)
    df_Opportunity['BillingPostalCode_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingPostalCode_Det), axis=1)
    df_Opportunity['BillingStreet_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingStreet_Det), axis=1)
    return df_Opportunity

def parallelize_dataframe(df, func, n_cores=7):
    df_split = np.array_split(df, n_cores)
    # 使用spawn上下文创建进程池
    with get_context('spawn').Pool(n_cores) as pool:
        df = pd.concat(pool.map(func, df_split), ignore_index=True)
    return df

# 执行并行处理
df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)

方案2:将函数放入单独模块(适合脚本环境)

如果你的代码是在.py脚本中运行,把OpportunityDetector和相关辅助函数放到一个单独的模块文件(比如processing_utils.py),然后在主脚本中导入:

# processing_utils.py
import pandas as pd

def ISColFilled(val):
    return 1 if pd.notna(val) else 0

def IsBillingValid(stage_name, det_val):
    return 0 if (stage_name == "Closed Won" and det_val == 0) else 1

def OpportunityDetector(df_Opportunity):
    df_Opportunity['BillingCity_Det'] = df_Opportunity['BillingCity'].apply(ISColFilled)
    df_Opportunity['BillingPostalCode_Det'] = df_Opportunity['BillingPostalCode'].apply(ISColFilled)
    df_Opportunity['BillingStreet_Det'] = df_Opportunity['BillingStreet'].apply(ISColFilled)
    
    df_Opportunity['BillingCity_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingCity_Det), axis=1)
    df_Opportunity['BillingPostalCode_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingPostalCode_Det), axis=1)
    df_Opportunity['BillingStreet_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingStreet_Det), axis=1)
    return df_Opportunity

主脚本:

# main.py
import numpy as np
import pandas as pd
from multiprocessing import Pool
from processing_utils import OpportunityDetector

def parallelize_dataframe(df, func, n_cores=7):
    df_split = np.array_split(df, n_cores)
    with Pool(n_cores) as pool:
        df = pd.concat(pool.map(func, df_split), ignore_index=True)
    return df

if __name__ == '__main__':
    # 加载你的DataFrame
    df_OpportunityA = pd.read_csv("your_data.csv")
    df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)

方案3:优化原函数(提升单进程+并行效率)

你的原函数大量使用apply(axis=1)逐行处理,这本身就很慢。建议改成向量化操作,即使不并行也能大幅提速,并行后效果更佳:

import numpy as np
import pandas as pd
from multiprocessing import get_context

def IsBillingValid(stage_name_series, det_val_series):
    # 向量化实现,替代逐行apply
    return np.where((stage_name_series == "Closed Won") & (det_val_series == 0), 0, 1)

def OpportunityDetector(df_Opportunity):
    # 用notna()替代自定义的ISColFilled,更高效
    df_Opportunity['BillingCity_Det'] = df_Opportunity['BillingCity'].notna().astype(int)
    df_Opportunity['BillingPostalCode_Det'] = df_Opportunity['BillingPostalCode'].notna().astype(int)
    df_Opportunity['BillingStreet_Det'] = df_Opportunity['BillingStreet'].notna().astype(int)
    
    # 向量化调用,比apply(axis=1)快10~100倍
    df_Opportunity['BillingCity_Det_StageName_Det'] = IsBillingValid(df_Opportunity['StageName'], df_Opportunity['BillingCity_Det'])
    df_Opportunity['BillingPostalCode_Det_StageName_Det'] = IsBillingValid(df_Opportunity['StageName'], df_Opportunity['BillingPostalCode_Det'])
    df_Opportunity['BillingStreet_StageName_Det'] = IsBillingValid(df_Opportunity['StageName'], df_Opportunity['BillingStreet_Det'])
    return df_Opportunity

def parallelize_dataframe(df, func, n_cores=7):
    df_split = np.array_split(df, n_cores)
    with get_context('spawn').Pool(n_cores) as pool:
        df = pd.concat(pool.map(func, df_split), ignore_index=True)
    return df

df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)

内容的提问来源于stack exchange,提问作者hugo

火山引擎 最新活动