You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何并行化Pandas DataFrame的行级apply()方法?使用multiprocessing库遇结果非DataFrame问题求助

解决多进程处理后结果不是DataFrame的问题

你的问题主要出在两个核心点:一是df.itertuples()返回的是不可变的namedtuple,而你的enrich_row函数原本是为处理可变的Pandas Series设计的,直接传入tuple会导致修改操作无效;二是pool.imap()返回的是迭代器,其中的元素是分散的单行数据,需要手动转换为DataFrame格式。

下面是修正后的完整代码,同时优化了多进程的使用逻辑:

import pandas as pd
import time
import multiprocessing as mp

def enrich_str(s):
    val1 = f'{s}_1'
    val2 = f'{s}_2'
    val3 = f'{s}_3'
    time.sleep(3)  # 模拟耗时的增强操作
    return val1, val2, val3

def enrich_row(row_tuple):
    # 从namedtuple中提取目标列的值,用getattr适配字段名动态获取
    col_name = row_tuple.colName
    my_string = getattr(row_tuple, col_name)
    val1, val2, val3 = enrich_str(my_string)
    
    # 将不可变的namedtuple转为字典,方便添加新字段
    row_dict = row_tuple._asdict()
    row_dict['enriched1'] = val1
    row_dict['enriched2'] = val2
    row_dict['enriched3'] = val3
    return row_dict

if __name__ == '__main__':
    df = pd.DataFrame(
        {'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']},
        columns=['numbers', 'colors']
    )
    df['colName'] = 'colors'

    tic = time.perf_counter()
    # 用上下文管理器自动管理进程池的生命周期,无需手动close/join
    with mp.Pool(processes=mp.cpu_count()) as pool:
        # index=False去掉默认返回的行索引,让tuple只包含数据列
        results = pool.imap(enrich_row, df.itertuples(index=False), chunksize=1)
        # 将迭代器中的字典列表转换为DataFrame
        enriched_df = pd.DataFrame(list(results))
    toc = time.perf_counter()
    print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
    print(enriched_df)

关键改动说明:

  1. 适配namedtuple处理

    • df.itertuples(index=False)去掉默认的行索引,减少数据传递开销;
    • getattr(row_tuple, col_name)动态获取目标列的值,避免直接索引tuple导致的错误;
    • 将namedtuple转为字典(row_tuple._asdict()),解决tuple不可变无法添加新字段的问题。
  2. 结果转换为DataFrame

    • pool.imap()返回的是迭代器,我们先将其转为字典列表(list(results)),再传入pd.DataFrame()即可得到和原apply输出格式一致的结构化数据。
  3. 多进程最佳实践

    • 使用with mp.Pool()上下文管理器,自动处理进程的创建、关闭和清理,代码更简洁安全;
    • mp.cpu_count()自动匹配CPU核心数,避免硬编码进程数导致的资源浪费;
    • 如果不需要严格保持原数据的行顺序,可以改用imap_unordered(),能进一步提升处理速度。

修改后,你既能得到符合预期的DataFrame结果,又能保持多进程的并行效率,处理5行数据的耗时应该稳定在3秒左右(和单个enrich_str的耗时一致,因为并行执行)。

内容的提问来源于stack exchange,提问作者lucazav

火山引擎 最新活动