You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
最佳实践
断点续算与增量计算:Daft Checkpoint
复制全文
断点续算与增量计算:Daft Checkpoint

火山引擎 LAS AI 的Daft 引擎为您提供Checkpoint功能,在批量数据处理的生产环境中,实现“断点续算”和“增量追加”,利用上次写入的结果进行去重,过滤掉不必要的计算。本文为您介绍基于Daft 引擎的 Checkpoint 功能详情与使用实践。

背景信息

使用分布式框架进行多模态数据(图像、文本、音频、视频等)预处理的流程在过程中,常常由于各种原因(如资源受限、环境不稳定、代码不稳健等)失败终止。一旦在中间过程中遇到失败,重新启动往往意味着整个流程从头开始运行,这就会导致对上次运行流程中已处理的数据又被重复执行。这种重复计算是对资源和时间的巨大浪费。
因此,基于 Daft 引擎,火山引擎 LAS AI 提供了Checkpoint功能:在批量数据写入的生产环境中,实现“断点续算”和“增量计算”。例如,第一次运行只处理并写入了部分数据就失败终止,后续再运行就只处理剩余的部分,避免重复、保证幂等并节省成本与时间。

注意与限制

细分项

注意事项

数据源要求

  • 当前仅支持单数据源(single source)的 dataframe 的 Checkpoint 功能。
  • 遇到多数据源(通常来自于joinunionintersectconcat操作等)的 dataframe,您可以考虑先运行df.collect()进行物化,将数据缓存到内存中,物化得到的新的dataframe只有一个数据源(single source),这样则可支持 Checkpoint。

运行条件要求

当前仅支持在 Daft on Ray 的分布式模式下使用 Checkpoint 功能,暂不支持对于 Daft 单机模式的 Checkpoint 功能。

数据格式要求

当前对 CSV、Parquet、JSON 的写入提供一致的 Checkpoint 功能。

使用说明

使用方法

df = daft.read_parquet(source_dir)

# 中间处理流程
df = df.with_column(...)
df = df.with_column(...)

# 配置checkpoint参数
checkpoint_config = {
    "key_column": "object_key",
}

# 写入
df.write_parquet(dest_dir, checkpoint_config=checkpoint_config)

配置checkpoint

参数

逻辑说明

checkpoint_config

使用checkpoint功能时,您需先配置checkpoint_config(具体配置参数见下文的checkpoint参数说明章节)。

  • 如果在写入文件时(如df.write_parquet)打开 checkpoint 配置,daft任务再次运行时,只追加上一次未写过(也就是没有被处理完毕)的数据,跳过已经被写入目标地址(也就是已经处理完毕)的数据,避免重复处理。
  • Checkpoint 需要在dataframe的write_api (如write_parquet)中设置。

checkpoint参数说明

参数

是否必填

说明

key_column

  • 以参数 key_column 指定的列作为去重主键,需要这个主键在整个数据集范围内保持unique。
  • 唯一键数据类型推荐为简单基本类型,如stringint等。

num_buckets

  • 设置处理 checkpoint 的负载均衡桶的数量,以控制单机内存压力。num_buckets越大,单机内存压力越小。
  • 默认值:4。建议保持默认值即可。如果是数十亿乃至几百亿条数据,可以按需增加num_buckets来减少单机内存压力。

num_cpus

  • 设置每一个 Checkpoint的负载均衡桶的CPU 数量。
  • 默认值1.0,建议保持默认值即可。

实践demo

demo1:按主键列 id 做断点续算(以Parquet为例)

以下为您演示 Daft 的 checkpoint 功能在 Daft 任务失败重试时,自动跳过已处理的记录,仅处理剩余记录。

  • 适用场景:有一批带主键(唯一 ID) 的数据集。
  • 断点续算需求:处理数据失败时,再次运行只处理剩余数据 ,避免重复。
  • 核心配置:使用 checkpoint_config={"key_column": "id"}

完整代码示例如下。

from __future__ import annotations

import shutil
from pathlib import Path

import daft
from daft import col, set_runner_ray


def init_runner() -> None:
    try:
        set_runner_ray(noop_if_initialized=True)
        print("[info] Ray runner initialized")
    except Exception as e:
        print(f"[warn] Fallback to default runner: {e}")


def example_checkpoint_with_id() -> None:
    """
    演示 Daft 的 checkpoint 增量写入功能:
    - 使用 'id' 列作为唯一键,避免重复写入。
    - 第一次写入 id=0..50(模拟历史数据);
    - 第二次写入 id=0..99(模拟全量拉取),checkpoint 自动跳过已存在 ID。
    - 验证最终结果为 100 条唯一记录。
    注意:checkpoint 功能依赖 Ray 执行器。
    """
    # 构造测试数据
    df_all = daft.from_pydict({"id": list(range(100)), "val": [f"v{i}" for i in range(100)]})
    df_seed = df_all.where(col("id") <= 50)

    # 设置输出路径(每次运行前清理)
    dest = Path("/tmp/ckpt_demo_parquet_id")
    if dest.exists():
        shutil.rmtree(dest)
    dest.mkdir(parents=True, exist_ok=True)

    # 配置 checkpoint:以 'id' 为唯一键
    ck = {"key_column": "id"}

    # 第一次写入:种子数据(id <= 50)
    df_seed.write_parquet(str(dest), write_mode="append", checkpoint_config=ck)

    # 第二次写入:全量数据(id 0~99),checkpoint 自动去重
    df_all.write_parquet(str(dest), write_mode="append", checkpoint_config=ck)

    # 验证结果
    out_df = daft.read_parquet(str(dest))
    ids = out_df.select("id").to_pydict()["id"]
    print("[result] rows:", len(ids), "unique_ids:", len(set(ids)))
    print("[result] has_all_ids_0_to_99:", set(ids) == set(range(100)))


if __name__ == "__main__":
    init_runner()
    example_checkpoint_with_id()

demo2:基于文件path做增量计算

以下为您演示 Daft 的 checkpoint 功能如何在增量计算场景中,避免重复处理,只处理新增文件,避免重复处理已处理过的文件。

  • 适用场景:有一个不断增长的文件目录(如日志、图像、语音),每次只处理新增文件。
  • 增量需求:避免重复处理已处理过的文件。
  • 核心配置:使用 checkpoint_config={"key_column": "path"},以文件路径为唯一标识。

完整代码示例如下。

from __future__ import annotations

import shutil
from pathlib import Path

import daft
from daft import set_runner_ray


def init_runner() -> None:
    try:
        set_runner_ray(noop_if_initialized=True)
        print("[info] Ray runner initialized")
    except Exception as e:
        print(f"[warn] Fallback to default runner: {e}")


def example_checkpoint_with_path() -> None:
    """
    演示基于文件路径的 checkpoint 增量写入:
    1. 创建 10 个初始文件 (file_0.txt ~ file_9.txt),写入元数据;
    2. 新增 5 个文件 (file_10.txt ~ file_14.txt);
    3. 重新扫描全部文件,通过 checkpoint 自动跳过已存在路径;
    4. 验证最终包含 15 条唯一路径,无重复。
    注意:checkpoint 功能依赖 Ray 执行器。
    """
    src_dir = Path("/tmp/glob_src")
    dest = Path("/tmp/ckpt_demo_parquet_path")

    # 清理旧数据,确保可重复运行
    if src_dir.exists():
        shutil.rmtree(src_dir)
    if dest.exists():
        shutil.rmtree(dest)
    src_dir.mkdir(parents=True)
    dest.mkdir(parents=True)

    # Step 1: 生成初始文件 (0-9)
    for i in range(10):
        (src_dir / f"file_{i}.txt").write_text("x" * i)

    df_initial = daft.from_glob_path(str(src_dir / "*.txt"))
    ck = {"key_column": "path"}
    df_initial.write_parquet(str(dest), write_mode="append", checkpoint_config=ck)

    # Step 2: 新增文件 (10-14)
    for i in range(10, 15):
        (src_dir / f"file_{i}.txt").write_text("x" * i)

    # Step 3: 重新扫描全部文件,checkpoint 自动去重
    df_full = daft.from_glob_path(str(src_dir / "*.txt"))
    df_full.write_parquet(str(dest), write_mode="append", checkpoint_config=ck)

    # Step 4: 验证结果
    out_df = daft.read_parquet(str(dest))
    paths = out_df.select("path").to_pydict()["path"]
    print("[result] total rows:", len(paths), "unique paths:", len(set(paths)))

    expected_paths = {str(src_dir / f"file_{i}.txt") for i in range(15)}
    print("[result] has_all_expected_paths:", set(paths) == expected_paths)


if __name__ == "__main__":
    init_runner()
    example_checkpoint_with_path()
最近更新时间:2026.01.12 15:04:45
这个页面对您有帮助吗?
有用
有用
无用
无用