You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何缩短Dask DataFrame转换为Pandas DataFrame的耗时?

优化Dask读取大型CSV并Compute的耗时方案

看起来你在处理大型CSV文件时,两次compute()操作耗时过长,我来分享几个实用的优化方向,帮你大幅缩短处理时间:


1. 合并两次Compute操作,避免重复计算

你现在分别对两个过滤后的Dask DataFrame执行compute(),这相当于让Dask重复扫描、处理整个数据源两次——这是最大的浪费。建议先在Dask层面把需要的数据筛选出来,只触发一次compute,再拆分结果:

# 先在Dask层面筛选出所有需要的parameter_id
filtered_data = Tea2Array[Tea2Array['parameter_id'].isin([168566, 168577])]
# 一次性计算所有需要的数据,只扫描一次文件
combined_df = filtered_data.compute()

# 再拆分得到目标DataFrame
P1MI3 = combined_df[combined_df['parameter_id'] == 168566]
P1MJC_old = combined_df[combined_df['parameter_id'] == 168577]

这样至少能节省一半的时间,因为不需要重复做IO和数据处理工作。

2. 优化CSV读取参数,从源头减少开销

你的CSV是utf-16编码+特殊分隔符,读取阶段可以通过参数优化减少不必要的计算:

  • 提前指定列类型:避免Dask自动推断数据类型(大文件类型推断非常耗时),直接给需要的列指定dtype;
  • 读取阶段就只加载需要的列:你后续只保留4列,读的时候直接用usecols指定,不用加载其他无关列;
  • 调整分块大小:根据你的内存情况设置blocksize,让每个分区的大小适合并行处理(比如64MB/128MB)。

修改后的读取函数示例:

def t_createdd(Path):
    # 根据实际数据类型调整dtype,比如如果Value是字符串就改成object
    dtype_spec = {
        'Parameter_Id': 'int64',
        'Reading_Id': 'int64',
        'X': 'float64',
        'Value': 'float64'
    }
    dataframe = dd.read_csv(
        Path,
        sep=chr(1),
        encoding="utf-16",
        usecols=['Parameter_Id','Reading_Id','X','Value'],  # 只加载需要的列
        dtype=dtype_spec,  # 提前指定类型
        blocksize='64MB'  # 根据内存调整,比如128MB
    )
    return dataframe

3. 利用多核并行提升计算效率

默认情况下Dask的并行能力可能没有完全发挥,你可以手动初始化一个多线程客户端,利用CPU多核加速:

from dask.distributed import Client

# 根据你的CPU核心数设置n_workers,比如8核就设8
client = Client(n_workers=4)

这样Dask会把计算任务拆分到多个线程并行处理,能显著加快compute()的速度。

4. 长期优化:转换为列式存储格式

如果需要频繁处理这份数据,建议把CSV转换成Parquet或Feather格式——这类列式存储格式支持谓词下推,读取和过滤速度比CSV快数倍,而且压缩比高,能减少IO开销:

# 第一次读取后保存为Parquet(只做一次)
Tea2Array_latest.to_parquet('Tea2Array_latest.parquet')

# 后续直接读取Parquet文件,速度大幅提升
Tea2Array_latest = dd.read_parquet('Tea2Array_latest.parquet')

5. 检查系统资源是否充足

如果你的机器内存不足,Dask会自动使用磁盘交换空间,这会导致速度急剧下降。可以通过client.memory_info()查看内存使用情况,确保有足够的内存来处理数据;同时避免在计算时运行其他占用CPU/内存的程序,让资源集中在Dask任务上。


内容的提问来源于stack exchange,提问作者K.S

火山引擎 最新活动