如何并行化Pandas DataFrame的行级apply()方法?使用multiprocessing库遇结果非DataFrame问题求助
解决多进程处理后结果不是DataFrame的问题
你的问题主要出在两个核心点:一是df.itertuples()返回的是不可变的namedtuple,而你的enrich_row函数原本是为处理可变的Pandas Series设计的,直接传入tuple会导致修改操作无效;二是pool.imap()返回的是迭代器,其中的元素是分散的单行数据,需要手动转换为DataFrame格式。
下面是修正后的完整代码,同时优化了多进程的使用逻辑:
import pandas as pd import time import multiprocessing as mp def enrich_str(s): val1 = f'{s}_1' val2 = f'{s}_2' val3 = f'{s}_3' time.sleep(3) # 模拟耗时的增强操作 return val1, val2, val3 def enrich_row(row_tuple): # 从namedtuple中提取目标列的值,用getattr适配字段名动态获取 col_name = row_tuple.colName my_string = getattr(row_tuple, col_name) val1, val2, val3 = enrich_str(my_string) # 将不可变的namedtuple转为字典,方便添加新字段 row_dict = row_tuple._asdict() row_dict['enriched1'] = val1 row_dict['enriched2'] = val2 row_dict['enriched3'] = val3 return row_dict if __name__ == '__main__': df = pd.DataFrame( {'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, columns=['numbers', 'colors'] ) df['colName'] = 'colors' tic = time.perf_counter() # 用上下文管理器自动管理进程池的生命周期,无需手动close/join with mp.Pool(processes=mp.cpu_count()) as pool: # index=False去掉默认返回的行索引,让tuple只包含数据列 results = pool.imap(enrich_row, df.itertuples(index=False), chunksize=1) # 将迭代器中的字典列表转换为DataFrame enriched_df = pd.DataFrame(list(results)) toc = time.perf_counter() print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds") print(enriched_df)
关键改动说明:
适配namedtuple处理:
- 用
df.itertuples(index=False)去掉默认的行索引,减少数据传递开销; - 用
getattr(row_tuple, col_name)动态获取目标列的值,避免直接索引tuple导致的错误; - 将namedtuple转为字典(
row_tuple._asdict()),解决tuple不可变无法添加新字段的问题。
- 用
结果转换为DataFrame:
pool.imap()返回的是迭代器,我们先将其转为字典列表(list(results)),再传入pd.DataFrame()即可得到和原apply输出格式一致的结构化数据。
多进程最佳实践:
- 使用
with mp.Pool()上下文管理器,自动处理进程的创建、关闭和清理,代码更简洁安全; - 用
mp.cpu_count()自动匹配CPU核心数,避免硬编码进程数导致的资源浪费; - 如果不需要严格保持原数据的行顺序,可以改用
imap_unordered(),能进一步提升处理速度。
- 使用
修改后,你既能得到符合预期的DataFrame结果,又能保持多进程的并行效率,处理5行数据的耗时应该稳定在3秒左右(和单个enrich_str的耗时一致,因为并行执行)。
内容的提问来源于stack exchange,提问作者lucazav




