You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

4G内存下如何用Python处理多份大型CSV文件?

嘿,我正好经常处理这类大文件内存不足的问题,给你几个靠谱的解决方案,都是亲测有效的:

方法一:Pandas原生分块处理,分步合并+增量导出

这个方法不用装额外库,完全用你熟悉的Pandas,适合不想折腾的情况:

  1. 先处理小文件,生成中间表
    首先读入file3——就170行,随便放内存里都没问题:

    import pandas as pd
    file3 = pd.read_csv('data3.csv')
    

    然后分块读file1,每块和file3做left join,直接把合并后的块写入中间CSV,不用把整个200万行的file1塞进内存。我一般设每块10万行,你可以根据自己内存剩余调整:

    chunk_size = 100000
    first_write = True
    for chunk in pd.read_csv('data1.csv', chunksize=chunk_size):
        merged_chunk = pd.merge(chunk, file3, on='id1', how='left')
        if first_write:
            # 第一次写要带表头
            merged_chunk.to_csv('merged_small.csv', index=False)
            first_write = False
        else:
            # 后续追加不要表头
            merged_chunk.to_csv('merged_small.csv', mode='a', header=False, index=False)
    

    这样得到的merged_small.csv也就几百MB,完全能放进4G内存。

  2. 分块读大文件file2,和中间表合并后导出
    先把中间表读进内存:

    merged_small = pd.read_csv('merged_small.csv')
    

    然后分块读file2,每块和merged_small做left join,再追加到最终CSV里。哪怕file2有重复的id2,结果和一次性合并的完全一致:

    first_write = True
    for chunk in pd.read_csv('data2.csv', chunksize=chunk_size):
        final_chunk = pd.merge(merged_small, chunk, on='id2', how='left')
        if first_write:
            final_chunk.to_csv('final_data.csv', index=False)
            first_write = False
        else:
            final_chunk.to_csv('final_data.csv', mode='a', header=False, index=False)
    
方法二:用Dask一键搞定(强烈推荐)

Dask是专门处理大数据集的工具,API和Pandas几乎一模一样,会自动帮你分块计算,不用手动操心分块的事:

  1. 先装Dask:
    pip install dask[complete]
    
  2. 代码和你原来的几乎一样,只是把pd换成dd
    import dask.dataframe as dd
    
    # Dask会自动分块读取文件,不用设chunksize
    ddf1 = dd.read_csv('data1.csv')
    ddf2 = dd.read_csv('data2.csv')
    ddf3 = dd.read_csv('data3.csv')
    
    # 合并操作是延迟计算的,不会立刻占内存
    ddf_merged = ddf1.merge(ddf3, on='id1', how='left').merge(ddf2, on='id2', how='left')
    
    # 导出为单个CSV,Dask会自动处理分块合并
    ddf_merged.to_csv('final_data_dask.csv', single_file=True, index=False)
    
    这个方法最省心,完全不用自己算分块大小,Dask会根据你的内存自动调整,结果和Pandas合并的完全一致。
方法三:用SQLite数据库处理(适合复杂查询场景)

如果你懂点SQL,把CSV导入SQLite后用SQL做join也是个好办法,数据库会自动优化内存使用,还能加索引提速:

  1. 把文件导入SQLite:
    import sqlite3
    import pandas as pd
    
    # 创建临时数据库
    conn = sqlite3.connect('temp_merge.db')
    
    # 分块导入file1和file2,避免内存爆炸
    for chunk in pd.read_csv('data1.csv', chunksize=100000):
        chunk.to_sql('file1', conn, if_exists='append', index=False)
    
    for chunk in pd.read_csv('data2.csv', chunksize=100000):
        chunk.to_sql('file2', conn, if_exists='append', index=False)
    
    # file3直接导入就行
    pd.read_csv('data3.csv').to_sql('file3', conn, if_exists='replace', index=False)
    
  2. 加索引提速(可选,但推荐):
    conn.execute('CREATE INDEX idx_file1_id1 ON file1(id1)')
    conn.execute('CREATE INDEX idx_file1_id2 ON file1(id2)')
    conn.execute('CREATE INDEX idx_file2_id2 ON file2(id2)')
    conn.execute('CREATE INDEX idx_file3_id1 ON file3(id1)')
    
  3. 执行SQL查询并导出结果:
    # SQL语句和你的merge逻辑完全一致
    query = """
    SELECT f1.*, f3.*, f2.*
    FROM file1 f1
    LEFT JOIN file3 f3 ON f1.id1 = f3.id1
    LEFT JOIN file2 f2 ON f1.id2 = f2.id2
    """
    # 分块读取查询结果,避免内存压力
    first_write = True
    for chunk in pd.read_sql_query(query, conn, chunksize=100000):
        # 去掉重复的id1和id2列(SQL查询会重复返回)
        chunk = chunk.loc[:, ~chunk.columns.duplicated()]
        if first_write:
            chunk.to_csv('final_data_sql.csv', index=False)
            first_write = False
        else:
            chunk.to_csv('final_data_sql.csv', mode='a', header=False, index=False)
    
    conn.close()
    

内容的提问来源于stack exchange,提问作者Kiwi Qi

火山引擎 最新活动