如何缩短Dask DataFrame转换为Pandas DataFrame的耗时?
优化Dask读取大型CSV并Compute的耗时方案
看起来你在处理大型CSV文件时,两次compute()操作耗时过长,我来分享几个实用的优化方向,帮你大幅缩短处理时间:
1. 合并两次Compute操作,避免重复计算
你现在分别对两个过滤后的Dask DataFrame执行compute(),这相当于让Dask重复扫描、处理整个数据源两次——这是最大的浪费。建议先在Dask层面把需要的数据筛选出来,只触发一次compute,再拆分结果:
# 先在Dask层面筛选出所有需要的parameter_id filtered_data = Tea2Array[Tea2Array['parameter_id'].isin([168566, 168577])] # 一次性计算所有需要的数据,只扫描一次文件 combined_df = filtered_data.compute() # 再拆分得到目标DataFrame P1MI3 = combined_df[combined_df['parameter_id'] == 168566] P1MJC_old = combined_df[combined_df['parameter_id'] == 168577]
这样至少能节省一半的时间,因为不需要重复做IO和数据处理工作。
2. 优化CSV读取参数,从源头减少开销
你的CSV是utf-16编码+特殊分隔符,读取阶段可以通过参数优化减少不必要的计算:
- 提前指定列类型:避免Dask自动推断数据类型(大文件类型推断非常耗时),直接给需要的列指定dtype;
- 读取阶段就只加载需要的列:你后续只保留4列,读的时候直接用
usecols指定,不用加载其他无关列; - 调整分块大小:根据你的内存情况设置
blocksize,让每个分区的大小适合并行处理(比如64MB/128MB)。
修改后的读取函数示例:
def t_createdd(Path): # 根据实际数据类型调整dtype,比如如果Value是字符串就改成object dtype_spec = { 'Parameter_Id': 'int64', 'Reading_Id': 'int64', 'X': 'float64', 'Value': 'float64' } dataframe = dd.read_csv( Path, sep=chr(1), encoding="utf-16", usecols=['Parameter_Id','Reading_Id','X','Value'], # 只加载需要的列 dtype=dtype_spec, # 提前指定类型 blocksize='64MB' # 根据内存调整,比如128MB ) return dataframe
3. 利用多核并行提升计算效率
默认情况下Dask的并行能力可能没有完全发挥,你可以手动初始化一个多线程客户端,利用CPU多核加速:
from dask.distributed import Client # 根据你的CPU核心数设置n_workers,比如8核就设8 client = Client(n_workers=4)
这样Dask会把计算任务拆分到多个线程并行处理,能显著加快compute()的速度。
4. 长期优化:转换为列式存储格式
如果需要频繁处理这份数据,建议把CSV转换成Parquet或Feather格式——这类列式存储格式支持谓词下推,读取和过滤速度比CSV快数倍,而且压缩比高,能减少IO开销:
# 第一次读取后保存为Parquet(只做一次) Tea2Array_latest.to_parquet('Tea2Array_latest.parquet') # 后续直接读取Parquet文件,速度大幅提升 Tea2Array_latest = dd.read_parquet('Tea2Array_latest.parquet')
5. 检查系统资源是否充足
如果你的机器内存不足,Dask会自动使用磁盘交换空间,这会导致速度急剧下降。可以通过client.memory_info()查看内存使用情况,确保有足够的内存来处理数据;同时避免在计算时运行其他占用CPU/内存的程序,让资源集中在Dask任务上。
内容的提问来源于stack exchange,提问作者K.S




