如何利用多线程/多进程读取多文件,优化Python数据分析效率?
大文件并行处理+合并的Python优化方案
嘿,这个需求我太熟了!处理大文件时并行化确实能把等待时间砍半甚至更多,我给你梳理下思路和具体的代码实现,分两种场景给你参考:
一、先搞清楚用多线程还是多进程?
- 如果你的数据提取逻辑是IO密集型(比如只是逐行读取、简单字段拆分):用多线程就够,因为文件IO本身会等待磁盘响应,多线程能充分利用等待时间处理其他文件。
- 如果你的数据提取是CPU密集型(比如复杂正则匹配、数值计算、数据清洗):必须用多进程,Python的GIL锁会限制多线程的CPU利用率,多进程能绕过这个问题,充分利用多核CPU。
二、具体代码实现
1. 通用文件处理函数
先写好单个文件的处理逻辑,核心要注意不要一次性把大文件读进内存,逐行处理更稳妥:
def process_large_file(file_path): """处理单个大文件,提取目标数据""" extracted_data = [] try: with open(file_path, 'r', encoding='utf-8') as f: # 跳过表头(如果你的文件有表头的话) header = next(f) for line_num, line in enumerate(f, start=2): # 这里替换成你的实际数据提取逻辑 line = line.strip() if not line: continue # 示例:按逗号拆分字段,提取第1、3列 fields = line.split(',') if len(fields) >=3: extracted_data.append((fields[0], fields[2])) print(f"处理完成:{file_path},共提取{len(extracted_data)}条数据") except Exception as e: print(f"处理{file_path}出错:{str(e)}") # 可以根据需求选择返回空列表或者抛出异常 return [] return extracted_data
2. 多线程版本(IO密集型场景)
用concurrent.futures.ThreadPoolExecutor,写法简单,开销小:
from concurrent.futures import ThreadPoolExecutor, as_completed def main(): # 替换成你的三个大文件路径 file_paths = ['file1.csv', 'file2.csv', 'file3.csv'] # 收集所有处理结果 all_results = [] # 线程数设为文件数即可,不用开太多浪费资源 with ThreadPoolExecutor(max_workers=len(file_paths)) as executor: # 提交所有文件处理任务 future_to_file = {executor.submit(process_large_file, path): path for path in file_paths} # 等待所有任务完成,逐个收集结果 for future in as_completed(future_to_file): file_path = future_to_file[future] try: result = future.result() all_results.extend(result) except Exception as e: print(f"任务{file_path}执行失败:{str(e)}") # 所有文件处理完成后,调用下游合并函数 merge_and_save(all_results) def merge_and_save(all_data): """下游合并函数,将所有数据写入目标文件""" with open('merged_result.csv', 'w', encoding='utf-8') as f: # 写入表头(根据你的数据结构调整) f.write("id,value\n") for item in all_data: f.write(f"{item[0]},{item[1]}\n") print(f"合并完成,共写入{len(all_data)}条数据到merged_result.csv") if __name__ == "__main__": main()
3. 多进程版本(CPU密集型场景)
只需要把ThreadPoolExecutor换成ProcessPoolExecutor就行,注意进程间数据传递要可序列化(比如列表、元组都没问题):
from concurrent.futures import ProcessPoolExecutor, as_completed # process_large_file和merge_and_save函数和上面完全一致,无需修改 def main(): file_paths = ['file1.csv', 'file2.csv', 'file3.csv'] all_results = [] # 进程数建议设为CPU核心数(比如os.cpu_count()),避免过度调度 with ProcessPoolExecutor(max_workers=3) as executor: future_to_file = {executor.submit(process_large_file, path): path for path in file_paths} for future in as_completed(future_to_file): file_path = future_to_file[future] try: result = future.result() all_results.extend(result) except Exception as e: print(f"任务{file_path}执行失败:{str(e)}") merge_and_save(all_results) if __name__ == "__main__": main()
三、额外优化建议
- 内存优化:如果每个文件提取后的数据还是很大,不要一次性存在
all_results里,可以考虑边处理边写入临时文件,最后合并临时文件,避免内存爆掉。 - 进度监控:可以用
tqdm库给任务加进度条,直观看到处理进度(比如from tqdm import tqdm,然后在as_completed外面套tqdm(future_to_file.items()))。 - 批量处理:如果逐行处理太慢,可以改成按块读取(比如每次读1000行),减少IO操作次数。
- 异常重试:对重要文件,可以在
process_large_file里加重试逻辑,比如失败后重试2次。
内容的提问来源于stack exchange,提问作者everestial




