如何并行修改大型DataFrame?百万级数据处理提速求助
Hey there! 处理百万级DataFrame的并行字符串操作确实能大幅提速,但踩坑也很常见——你遇到的挂起问题,大概率是没处理好进程间的数据序列化开销、进程创建逻辑这些细节。下面给你几个靠谱的实现方案,结合你的需求一步步来:
方案1:用Python标准库
multiprocessing手动分块处理 直接对每一行开进程会导致巨大的开销,所以我们先把DataFrame分块,再给每个进程分配一块数据处理:
import pandas as pd import multiprocessing as mp # 定义你的字符串处理函数(替换成你的实际逻辑) def process_single_str(s): # 示例:转大写后截取前5个字符 return s.upper()[:5] # 定义分块处理的辅助函数 def process_data_chunk(chunk): chunk['str'] = chunk['str'].apply(process_single_str) return chunk if __name__ == '__main__': # 示例DataFrame(替换成你的百万行数据) df1 = pd.DataFrame({'str': ['apple', 'banana', 'cherry', 'date', 'elderberry']}) # 分块大小:百万行建议分成10-20块,根据你的内存调整 chunk_size = len(df1) // 10 if len(df1) >=10 else 1 chunks = [df1[i:i+chunk_size] for i in range(0, len(df1), chunk_size)] # 创建进程池:进程数建议等于CPU核心数(避免过度调度) with mp.Pool(mp.cpu_count()) as pool: processed_chunks = pool.map(process_data_chunk, chunks) # 合并处理后的块得到最终结果 df_processed = pd.concat(processed_chunks, ignore_index=True)
⚠️ 关键注意点:
- 必须把主逻辑放在
if __name__ == '__main__':下(Windows系统下避免重复创建进程导致死锁) - 分块是为了减少大对象的序列化开销,直接传递整个DataFrame给进程会导致严重卡顿
方案2:用
swifter自动适配并行(最省心) swifter会自动检测你的函数和数据规模,判断是用普通apply还是切换到并行模式,完全不用手动处理分块:
import pandas as pd import swifter # 同样定义你的处理函数 def process_single_str(s): return s.upper()[:5] df1 = pd.DataFrame({'str': ['apple', 'banana', 'cherry', 'date', 'elderberry']}) # 直接用swifter.apply替代普通apply df1['str'] = df1['str'].swifter.apply(process_single_str)
使用前需要先安装:pip install swifter
这个方案适合大多数场景,尤其是你不想纠结并行细节的情况,百万级数据下会自动启用并行,避免手动分块的麻烦。
方案3:用Dask处理超大规模数据(内存不足时首选)
如果你的百万行数据已经接近内存上限,Dask会把数据分成多个小分区并行处理,而且不会一次性加载全部数据到内存:
import dask.dataframe as dd import pandas as pd import multiprocessing as mp def process_single_str(s): return s.upper()[:5] # 把Pandas DataFrame转成Dask DataFrame,指定分区数(建议等于CPU核心数) ddf = dd.from_pandas(df1, npartitions=mp.cpu_count()) # 应用处理函数,必须指定meta参数告诉Dask返回的数据类型 ddf['str'] = ddf['str'].apply(process_single_str, meta=('str', 'object')) # 计算结果并转回Pandas(如果内存够的话),也可以直接用Dask继续后续操作 df_processed = ddf.compute()
为什么你的之前代码会挂起?
大概率是这两个原因:
- 逐行并行的开销过大:给每一行单独开进程,进程创建、销毁的开销远大于实际计算开销,导致系统资源耗尽卡顿
- 大对象序列化问题:直接传递整个DataFrame给进程,pickle序列化大对象会占用大量内存和时间,甚至导致死锁
内容的提问来源于stack exchange,提问作者N08




