火山引擎 LAS AI 的Daft 引擎为您提供Checkpoint功能,在批量数据处理的生产环境中,实现“断点续算”和“增量追加”,利用上次写入的结果进行去重,过滤掉不必要的计算。本文为您介绍基于Daft 引擎的 Checkpoint 功能详情与使用实践。
使用分布式框架进行多模态数据(图像、文本、音频、视频等)预处理的流程在过程中,常常由于各种原因(如资源受限、环境不稳定、代码不稳健等)失败终止。一旦在中间过程中遇到失败,重新启动往往意味着整个流程从头开始运行,这就会导致对上次运行流程中已处理的数据又被重复执行。这种重复计算是对资源和时间的巨大浪费。
因此,基于 Daft 引擎,火山引擎 LAS AI 提供了Checkpoint功能:在批量数据写入的生产环境中,实现“断点续算”和“增量计算”。例如,第一次运行只处理并写入了部分数据就失败终止,后续再运行就只处理剩余的部分,避免重复、保证幂等并节省成本与时间。
细分项 | 注意事项 |
|---|---|
数据源要求 |
|
运行条件要求 | 当前仅支持在 Daft on Ray 的分布式模式下使用 Checkpoint 功能,暂不支持对于 Daft 单机模式的 Checkpoint 功能。 |
数据格式要求 | 当前对 CSV、Parquet、JSON 的写入提供一致的 Checkpoint 功能。 |
df = daft.read_parquet(source_dir) # 中间处理流程 df = df.with_column(...) df = df.with_column(...) # 配置checkpoint参数 checkpoint_config = { "key_column": "object_key", } # 写入 df.write_parquet(dest_dir, checkpoint_config=checkpoint_config)
参数 | 逻辑说明 |
|---|---|
checkpoint_config | 使用checkpoint功能时,您需先配置checkpoint_config(具体配置参数见下文的checkpoint参数说明章节)。
|
参数 | 是否必填 | 说明 |
|---|---|---|
key_column | 是 |
|
num_buckets | 否 |
|
num_cpus | 否 |
|
以下为您演示 Daft 的 checkpoint 功能在 Daft 任务失败重试时,自动跳过已处理的记录,仅处理剩余记录。
checkpoint_config={"key_column": "id"}。完整代码示例如下。
from __future__ import annotations import shutil from pathlib import Path import daft from daft import col, set_runner_ray def init_runner() -> None: try: set_runner_ray(noop_if_initialized=True) print("[info] Ray runner initialized") except Exception as e: print(f"[warn] Fallback to default runner: {e}") def example_checkpoint_with_id() -> None: """ 演示 Daft 的 checkpoint 增量写入功能: - 使用 'id' 列作为唯一键,避免重复写入。 - 第一次写入 id=0..50(模拟历史数据); - 第二次写入 id=0..99(模拟全量拉取),checkpoint 自动跳过已存在 ID。 - 验证最终结果为 100 条唯一记录。 注意:checkpoint 功能依赖 Ray 执行器。 """ # 构造测试数据 df_all = daft.from_pydict({"id": list(range(100)), "val": [f"v{i}" for i in range(100)]}) df_seed = df_all.where(col("id") <= 50) # 设置输出路径(每次运行前清理) dest = Path("/tmp/ckpt_demo_parquet_id") if dest.exists(): shutil.rmtree(dest) dest.mkdir(parents=True, exist_ok=True) # 配置 checkpoint:以 'id' 为唯一键 ck = {"key_column": "id"} # 第一次写入:种子数据(id <= 50) df_seed.write_parquet(str(dest), write_mode="append", checkpoint_config=ck) # 第二次写入:全量数据(id 0~99),checkpoint 自动去重 df_all.write_parquet(str(dest), write_mode="append", checkpoint_config=ck) # 验证结果 out_df = daft.read_parquet(str(dest)) ids = out_df.select("id").to_pydict()["id"] print("[result] rows:", len(ids), "unique_ids:", len(set(ids))) print("[result] has_all_ids_0_to_99:", set(ids) == set(range(100))) if __name__ == "__main__": init_runner() example_checkpoint_with_id()
以下为您演示 Daft 的 checkpoint 功能如何在增量计算场景中,避免重复处理,只处理新增文件,避免重复处理已处理过的文件。
checkpoint_config={"key_column": "path"},以文件路径为唯一标识。完整代码示例如下。
from __future__ import annotations import shutil from pathlib import Path import daft from daft import set_runner_ray def init_runner() -> None: try: set_runner_ray(noop_if_initialized=True) print("[info] Ray runner initialized") except Exception as e: print(f"[warn] Fallback to default runner: {e}") def example_checkpoint_with_path() -> None: """ 演示基于文件路径的 checkpoint 增量写入: 1. 创建 10 个初始文件 (file_0.txt ~ file_9.txt),写入元数据; 2. 新增 5 个文件 (file_10.txt ~ file_14.txt); 3. 重新扫描全部文件,通过 checkpoint 自动跳过已存在路径; 4. 验证最终包含 15 条唯一路径,无重复。 注意:checkpoint 功能依赖 Ray 执行器。 """ src_dir = Path("/tmp/glob_src") dest = Path("/tmp/ckpt_demo_parquet_path") # 清理旧数据,确保可重复运行 if src_dir.exists(): shutil.rmtree(src_dir) if dest.exists(): shutil.rmtree(dest) src_dir.mkdir(parents=True) dest.mkdir(parents=True) # Step 1: 生成初始文件 (0-9) for i in range(10): (src_dir / f"file_{i}.txt").write_text("x" * i) df_initial = daft.from_glob_path(str(src_dir / "*.txt")) ck = {"key_column": "path"} df_initial.write_parquet(str(dest), write_mode="append", checkpoint_config=ck) # Step 2: 新增文件 (10-14) for i in range(10, 15): (src_dir / f"file_{i}.txt").write_text("x" * i) # Step 3: 重新扫描全部文件,checkpoint 自动去重 df_full = daft.from_glob_path(str(src_dir / "*.txt")) df_full.write_parquet(str(dest), write_mode="append", checkpoint_config=ck) # Step 4: 验证结果 out_df = daft.read_parquet(str(dest)) paths = out_df.select("path").to_pydict()["path"] print("[result] total rows:", len(paths), "unique paths:", len(set(paths))) expected_paths = {str(src_dir / f"file_{i}.txt") for i in range(15)} print("[result] has_all_expected_paths:", set(paths) == expected_paths) if __name__ == "__main__": init_runner() example_checkpoint_with_path()