4G内存下如何用Python处理多份大型CSV文件?
嘿,我正好经常处理这类大文件内存不足的问题,给你几个靠谱的解决方案,都是亲测有效的:
方法一:Pandas原生分块处理,分步合并+增量导出
这个方法不用装额外库,完全用你熟悉的Pandas,适合不想折腾的情况:
先处理小文件,生成中间表
首先读入file3——就170行,随便放内存里都没问题:import pandas as pd file3 = pd.read_csv('data3.csv')然后分块读
file1,每块和file3做left join,直接把合并后的块写入中间CSV,不用把整个200万行的file1塞进内存。我一般设每块10万行,你可以根据自己内存剩余调整:chunk_size = 100000 first_write = True for chunk in pd.read_csv('data1.csv', chunksize=chunk_size): merged_chunk = pd.merge(chunk, file3, on='id1', how='left') if first_write: # 第一次写要带表头 merged_chunk.to_csv('merged_small.csv', index=False) first_write = False else: # 后续追加不要表头 merged_chunk.to_csv('merged_small.csv', mode='a', header=False, index=False)这样得到的
merged_small.csv也就几百MB,完全能放进4G内存。分块读大文件
file2,和中间表合并后导出
先把中间表读进内存:merged_small = pd.read_csv('merged_small.csv')然后分块读
file2,每块和merged_small做left join,再追加到最终CSV里。哪怕file2有重复的id2,结果和一次性合并的完全一致:first_write = True for chunk in pd.read_csv('data2.csv', chunksize=chunk_size): final_chunk = pd.merge(merged_small, chunk, on='id2', how='left') if first_write: final_chunk.to_csv('final_data.csv', index=False) first_write = False else: final_chunk.to_csv('final_data.csv', mode='a', header=False, index=False)
方法二:用Dask一键搞定(强烈推荐)
Dask是专门处理大数据集的工具,API和Pandas几乎一模一样,会自动帮你分块计算,不用手动操心分块的事:
- 先装Dask:
pip install dask[complete] - 代码和你原来的几乎一样,只是把
pd换成dd:
这个方法最省心,完全不用自己算分块大小,Dask会根据你的内存自动调整,结果和Pandas合并的完全一致。import dask.dataframe as dd # Dask会自动分块读取文件,不用设chunksize ddf1 = dd.read_csv('data1.csv') ddf2 = dd.read_csv('data2.csv') ddf3 = dd.read_csv('data3.csv') # 合并操作是延迟计算的,不会立刻占内存 ddf_merged = ddf1.merge(ddf3, on='id1', how='left').merge(ddf2, on='id2', how='left') # 导出为单个CSV,Dask会自动处理分块合并 ddf_merged.to_csv('final_data_dask.csv', single_file=True, index=False)
方法三:用SQLite数据库处理(适合复杂查询场景)
如果你懂点SQL,把CSV导入SQLite后用SQL做join也是个好办法,数据库会自动优化内存使用,还能加索引提速:
- 把文件导入SQLite:
import sqlite3 import pandas as pd # 创建临时数据库 conn = sqlite3.connect('temp_merge.db') # 分块导入file1和file2,避免内存爆炸 for chunk in pd.read_csv('data1.csv', chunksize=100000): chunk.to_sql('file1', conn, if_exists='append', index=False) for chunk in pd.read_csv('data2.csv', chunksize=100000): chunk.to_sql('file2', conn, if_exists='append', index=False) # file3直接导入就行 pd.read_csv('data3.csv').to_sql('file3', conn, if_exists='replace', index=False) - 加索引提速(可选,但推荐):
conn.execute('CREATE INDEX idx_file1_id1 ON file1(id1)') conn.execute('CREATE INDEX idx_file1_id2 ON file1(id2)') conn.execute('CREATE INDEX idx_file2_id2 ON file2(id2)') conn.execute('CREATE INDEX idx_file3_id1 ON file3(id1)') - 执行SQL查询并导出结果:
# SQL语句和你的merge逻辑完全一致 query = """ SELECT f1.*, f3.*, f2.* FROM file1 f1 LEFT JOIN file3 f3 ON f1.id1 = f3.id1 LEFT JOIN file2 f2 ON f1.id2 = f2.id2 """ # 分块读取查询结果,避免内存压力 first_write = True for chunk in pd.read_sql_query(query, conn, chunksize=100000): # 去掉重复的id1和id2列(SQL查询会重复返回) chunk = chunk.loc[:, ~chunk.columns.duplicated()] if first_write: chunk.to_csv('final_data_sql.csv', index=False) first_write = False else: chunk.to_csv('final_data_sql.csv', mode='a', header=False, index=False) conn.close()
内容的提问来源于stack exchange,提问作者Kiwi Qi




