You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何利用多线程/多进程读取多文件,优化Python数据分析效率?

大文件并行处理+合并的Python优化方案

嘿,这个需求我太熟了!处理大文件时并行化确实能把等待时间砍半甚至更多,我给你梳理下思路和具体的代码实现,分两种场景给你参考:

一、先搞清楚用多线程还是多进程?

  • 如果你的数据提取逻辑是IO密集型(比如只是逐行读取、简单字段拆分):用多线程就够,因为文件IO本身会等待磁盘响应,多线程能充分利用等待时间处理其他文件。
  • 如果你的数据提取是CPU密集型(比如复杂正则匹配、数值计算、数据清洗):必须用多进程,Python的GIL锁会限制多线程的CPU利用率,多进程能绕过这个问题,充分利用多核CPU。

二、具体代码实现

1. 通用文件处理函数

先写好单个文件的处理逻辑,核心要注意不要一次性把大文件读进内存,逐行处理更稳妥:

def process_large_file(file_path):
    """处理单个大文件,提取目标数据"""
    extracted_data = []
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            # 跳过表头(如果你的文件有表头的话)
            header = next(f)
            for line_num, line in enumerate(f, start=2):
                # 这里替换成你的实际数据提取逻辑
                line = line.strip()
                if not line:
                    continue
                # 示例:按逗号拆分字段,提取第1、3列
                fields = line.split(',')
                if len(fields) >=3:
                    extracted_data.append((fields[0], fields[2]))
            print(f"处理完成:{file_path},共提取{len(extracted_data)}条数据")
    except Exception as e:
        print(f"处理{file_path}出错:{str(e)}")
        # 可以根据需求选择返回空列表或者抛出异常
        return []
    return extracted_data

2. 多线程版本(IO密集型场景)

concurrent.futures.ThreadPoolExecutor,写法简单,开销小:

from concurrent.futures import ThreadPoolExecutor, as_completed

def main():
    # 替换成你的三个大文件路径
    file_paths = ['file1.csv', 'file2.csv', 'file3.csv']
    
    # 收集所有处理结果
    all_results = []
    
    # 线程数设为文件数即可,不用开太多浪费资源
    with ThreadPoolExecutor(max_workers=len(file_paths)) as executor:
        # 提交所有文件处理任务
        future_to_file = {executor.submit(process_large_file, path): path for path in file_paths}
        
        # 等待所有任务完成,逐个收集结果
        for future in as_completed(future_to_file):
            file_path = future_to_file[future]
            try:
                result = future.result()
                all_results.extend(result)
            except Exception as e:
                print(f"任务{file_path}执行失败:{str(e)}")
    
    # 所有文件处理完成后,调用下游合并函数
    merge_and_save(all_results)

def merge_and_save(all_data):
    """下游合并函数,将所有数据写入目标文件"""
    with open('merged_result.csv', 'w', encoding='utf-8') as f:
        # 写入表头(根据你的数据结构调整)
        f.write("id,value\n")
        for item in all_data:
            f.write(f"{item[0]},{item[1]}\n")
    print(f"合并完成,共写入{len(all_data)}条数据到merged_result.csv")

if __name__ == "__main__":
    main()

3. 多进程版本(CPU密集型场景)

只需要把ThreadPoolExecutor换成ProcessPoolExecutor就行,注意进程间数据传递要可序列化(比如列表、元组都没问题):

from concurrent.futures import ProcessPoolExecutor, as_completed

# process_large_file和merge_and_save函数和上面完全一致,无需修改

def main():
    file_paths = ['file1.csv', 'file2.csv', 'file3.csv']
    all_results = []
    
    # 进程数建议设为CPU核心数(比如os.cpu_count()),避免过度调度
    with ProcessPoolExecutor(max_workers=3) as executor:
        future_to_file = {executor.submit(process_large_file, path): path for path in file_paths}
        
        for future in as_completed(future_to_file):
            file_path = future_to_file[future]
            try:
                result = future.result()
                all_results.extend(result)
            except Exception as e:
                print(f"任务{file_path}执行失败:{str(e)}")
    
    merge_and_save(all_results)

if __name__ == "__main__":
    main()

三、额外优化建议

  • 内存优化:如果每个文件提取后的数据还是很大,不要一次性存在all_results里,可以考虑边处理边写入临时文件,最后合并临时文件,避免内存爆掉。
  • 进度监控:可以用tqdm库给任务加进度条,直观看到处理进度(比如from tqdm import tqdm,然后在as_completed外面套tqdm(future_to_file.items()))。
  • 批量处理:如果逐行处理太慢,可以改成按块读取(比如每次读1000行),减少IO操作次数。
  • 异常重试:对重要文件,可以在process_large_file里加重试逻辑,比如失败后重试2次。

内容的提问来源于stack exchange,提问作者everestial

火山引擎 最新活动