如何在Databricks中使用Multiprocessing Pool加速百万行Pandas DataFrame处理并解决PicklingError问题
解决Pandas多进程处理时的PicklingError问题
错误原因分析
你遇到的PicklingError是因为在交互式环境(比如Jupyter Notebook/Lab)中定义的函数,无法被multiprocessing的默认序列化机制(pickle)正确处理。当使用Pool时,子进程需要导入主模块中的函数,但交互式环境里的函数定义不会被子进程正确识别,导致属性查找失败。
解决方案
方案1:使用spawn启动上下文(适合Jupyter环境)
修改你的并行函数,使用multiprocessing.get_context('spawn')创建进程池,这种启动方式会重新初始化子进程的Python解释器,能正确识别函数定义:
import numpy as np import pandas as pd from multiprocessing import get_context # 先确保你的辅助函数已正确定义 def ISColFilled(val): return 1 if pd.notna(val) else 0 def IsBillingValid(stage_name, det_val): # 替换成你的实际业务逻辑 return 0 if (stage_name == "Closed Won" and det_val == 0) else 1 def OpportunityDetector(df_Opportunity): df_Opportunity['BillingCity_Det'] = df_Opportunity['BillingCity'].apply(ISColFilled) df_Opportunity['BillingPostalCode_Det'] = df_Opportunity['BillingPostalCode'].apply(ISColFilled) df_Opportunity['BillingStreet_Det'] = df_Opportunity['BillingStreet'].apply(ISColFilled) df_Opportunity['BillingCity_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingCity_Det), axis=1) df_Opportunity['BillingPostalCode_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingPostalCode_Det), axis=1) df_Opportunity['BillingStreet_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingStreet_Det), axis=1) return df_Opportunity def parallelize_dataframe(df, func, n_cores=7): df_split = np.array_split(df, n_cores) # 使用spawn上下文创建进程池 with get_context('spawn').Pool(n_cores) as pool: df = pd.concat(pool.map(func, df_split), ignore_index=True) return df # 执行并行处理 df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)
方案2:将函数放入单独模块(适合脚本环境)
如果你的代码是在.py脚本中运行,把OpportunityDetector和相关辅助函数放到一个单独的模块文件(比如processing_utils.py),然后在主脚本中导入:
# processing_utils.py import pandas as pd def ISColFilled(val): return 1 if pd.notna(val) else 0 def IsBillingValid(stage_name, det_val): return 0 if (stage_name == "Closed Won" and det_val == 0) else 1 def OpportunityDetector(df_Opportunity): df_Opportunity['BillingCity_Det'] = df_Opportunity['BillingCity'].apply(ISColFilled) df_Opportunity['BillingPostalCode_Det'] = df_Opportunity['BillingPostalCode'].apply(ISColFilled) df_Opportunity['BillingStreet_Det'] = df_Opportunity['BillingStreet'].apply(ISColFilled) df_Opportunity['BillingCity_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingCity_Det), axis=1) df_Opportunity['BillingPostalCode_Det_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingPostalCode_Det), axis=1) df_Opportunity['BillingStreet_StageName_Det'] = df_Opportunity.apply(lambda x: IsBillingValid(x.StageName, x.BillingStreet_Det), axis=1) return df_Opportunity
主脚本:
# main.py import numpy as np import pandas as pd from multiprocessing import Pool from processing_utils import OpportunityDetector def parallelize_dataframe(df, func, n_cores=7): df_split = np.array_split(df, n_cores) with Pool(n_cores) as pool: df = pd.concat(pool.map(func, df_split), ignore_index=True) return df if __name__ == '__main__': # 加载你的DataFrame df_OpportunityA = pd.read_csv("your_data.csv") df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)
方案3:优化原函数(提升单进程+并行效率)
你的原函数大量使用apply(axis=1)逐行处理,这本身就很慢。建议改成向量化操作,即使不并行也能大幅提速,并行后效果更佳:
import numpy as np import pandas as pd from multiprocessing import get_context def IsBillingValid(stage_name_series, det_val_series): # 向量化实现,替代逐行apply return np.where((stage_name_series == "Closed Won") & (det_val_series == 0), 0, 1) def OpportunityDetector(df_Opportunity): # 用notna()替代自定义的ISColFilled,更高效 df_Opportunity['BillingCity_Det'] = df_Opportunity['BillingCity'].notna().astype(int) df_Opportunity['BillingPostalCode_Det'] = df_Opportunity['BillingPostalCode'].notna().astype(int) df_Opportunity['BillingStreet_Det'] = df_Opportunity['BillingStreet'].notna().astype(int) # 向量化调用,比apply(axis=1)快10~100倍 df_Opportunity['BillingCity_Det_StageName_Det'] = IsBillingValid(df_Opportunity['StageName'], df_Opportunity['BillingCity_Det']) df_Opportunity['BillingPostalCode_Det_StageName_Det'] = IsBillingValid(df_Opportunity['StageName'], df_Opportunity['BillingPostalCode_Det']) df_Opportunity['BillingStreet_StageName_Det'] = IsBillingValid(df_Opportunity['StageName'], df_Opportunity['BillingStreet_Det']) return df_Opportunity def parallelize_dataframe(df, func, n_cores=7): df_split = np.array_split(df, n_cores) with get_context('spawn').Pool(n_cores) as pool: df = pd.concat(pool.map(func, df_split), ignore_index=True) return df df_new = parallelize_dataframe(df_OpportunityA, OpportunityDetector)
内容的提问来源于stack exchange,提问作者hugo




