You need to enable JavaScript to run this app.
导航
Ray Data Checkpoint在多模态数据处理中的应用实践
最近更新时间:2025.09.16 21:00:47首次发布时间:2025.09.16 21:00:47
复制全文
我的收藏
有用
有用
无用
无用

特性背景

在多模态数据(如图像、文本、音频、视频等)的预处理流程中,通常会涉及使用 GPU 进行数据处理和推理。然而,这类处理流程通常具有以下特点:

  • 超大规模数据集: 当数据集包含数百万乃至数十亿个样本(文件)时,处理时间极长,失败概率增加,重复处理成本高昂,难以承受。
  • 预处理任务耗时较长: 即使数据集规模并非特别巨大,但单个样本的预处理过程极为复杂且耗时(如高质量视频帧提取、复杂语音特征转换、大图像的高精度变换),致使整体流程耗时需数小时以上。
  • 资源受限或环境不稳定:
    • 共享 GPU 集群(存在被抢占的可能性)。
    • 使用 Spot 实例(价格低廉但可能随时被终止服务)。
    • 本地开发环境(可能因调试、系统更新等因素导致处理中断)。
  • 预处理流程复杂多变: 流程本身可能尚不够稳健,容易遭遇边界情况或未知错误。
  • 自动化流水线: 在无人值守的自动化数据处理流水线中,要求流程具备从错误中自动恢复的能力。
  • 异构多模态处理: 处理不同类型数据(如图像 + 文本 + 音频)的流程可能涉及多个步骤和工具链,增加了出错的环节。

上述处理流程一旦在中间环节失败,重新启动往往导致整个流程必须从头开始运行,这不可避免地造成先前已成功处理的数据被重复执行。这种重复处理不仅是对计算资源的巨大浪费——消耗了宝贵的GPU/CPU运算时间、增加了不必要的资源开销,也意味着宝贵的时间被无效占用。更重要的是,它会显著拖慢整个下游任务(例如关键的模型训练或数据评测阶段)的进度,形成一种雪球效应,使得整个数据处理链条的效率大幅降低,资源浪费和时间延误问题亟待解决。

特性说明

针对以上问题,EMR在多模态数据预处理链路中研发了Checkpoint功能。Checkpoint功能是一种能够记录数据处理进度、保存已完成任务状态的机制。该机制借助对“已成功处理的数据”进行持久化存储,达成了“失败后恢复时跳过已完成工作,仅处理未完成部分”的核心能力,尤其在依赖GPU开展高成本多模态数据处理(如图像增强、视频帧提取、跨模态特征对齐等)的场景中具备重要意义。其核心价值体现为显著节约计算资源和时间成本,增强处理流程的可靠性与鲁棒性,提高开发与研究效率。

功能说明

通过Ray Data输出时设置checkpoint相关配置,在数据处理过程中,已处理的数据将自动记录至指定的检查点位置。当任务失败并重启时,在读取(Read)阶段会自动过滤已处理的数据,仅未处理的数据将进入后续处理流程。

checkpoint_config = {
    "enabled": True,
    "num_buckets": 2,
    "key_columns": "id",
    "storage": {
        "type":"fs",
        "check_ready_timeout": 300,
        "fs_options": {
            "path": "tos://bucket_name/checkpointdata/",
            "key": "xxx",
            "secret": "yyy==",
            "endpoint_url": 'http://tos-cn-beijing.ivolces.com',
            "region": 'cn-beijing',
            "max_items_per_file": 10000,
        },
        "remote_args": {
            "num_cpus":1,
        },
    }
  }
ds.write_parquet(dest_path, checkpoint_config=checkpoint_config)

支持的数据源:任意数据源。可以是文件如图片等,也可以是数据文件如json等,也可以是关系型数据库如sql等
支持的输出格式:支持csv, json, parquet等文件输出格式。

参数说明

参数

说明

默认值

enabled

是否启用checkpoint功能

False

num_buckets

处理checkpoint的Actor数量,数据将按照分桶数(num_buckets)进行分桶处理,每个桶仅处理与之对应的一部分数据。在处理大规模数据时,每个Actor将分摊一部分数据进行处理。

1

key_columns

数据中的唯一键的列名

storage.type

目前固定为fs,代表fileSystem存储后端

fs

storage.check_ready_timeout

当任务重新时,加载Checkpoint信息的超时时间,以秒为单位,float类型,根据处理的数据量设置。500万long型数据加载约1min,

500

storage.fs_options.path

存储检查点(checkpoint)信息的文件路径,可以是本地路径,也可以是对象存储服务(TOS)路径。当为本地路径时,需要确保所有节点能够访问到相同路径,例如挂载的为火山引擎并行文件系统(vePFS)。

storage.fs_options.max_items_per_file

单个checkpoint文件保存的最大数据条数

100000

storage.fs_options下的其他配置

storage.fs_options下的其他配置可以配置TosFileSystem相关的配置,支持key,secret,endpoint_url,region等

storage.remote_args.x

处理checkpoint的Actor所使用的资源。设置参数需遵循Ray的Actor资源设置模式。x可以为num_cpus等。

参数最佳配置

数据量级(key_column的数据量)

推荐参数配置

说明

512M

num_buckets = 4

当唯一键为 long 型时,可支撑的数据条数约为 1.7亿。对于其他类型,可按照单条数据大小进行计算。

1G

num_buckets = 8

支撑的数据条数计算公式同上

1G以上

同比例配置

使用样例

记录级使用样例

处理以Row为粒度的多模态数据时,数据中的唯一键(如id)会被记录至检查点(checkpoint)中,当任务失败后再次执行时,会跳过已处理过的id。

import ray
import pandas as pd

ray.init()

from tosfs.core import TosFileSystem
fs = TosFileSystem(
    endpoint_url="https://tos-cn-beijing.ivolces.com",
    key='xxx',
    secret='yyy==',
    region="cn-beijing"
)

ds = ray.data.read_parquet("tos://bucket_name/source_data/", filesystem=fs)

dest_path = "tos://bucket_name/dest_data/"

checkpoint_config = {
    "enabled": True,
    "num_buckets": 2,
    "key_columns": "id",
    "storage": {
        "type":"fs",
        "check_ready_timeout": 300,
        "fs_options": {
            "path": "tos://bucket_name/checkpointdata/",
            "key": "xxx",
            "secret": "yyy==",
            "endpoint_url": 'http://tos-cn-beijing.ivolces.com',
            "region": 'cn-beijing',
            "max_items_per_file": 10000,
        },
        "remote_args": {
            "num_cpus":1,
        },
    }
  }
# 写入1234567行数据
ds.limit(1234567).write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)
# 写入剩下的数据
ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)
# 不会写入任务数据,因为前面已经全部处理完毕
ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)

文件级使用样例

处理以文件为粒度的多模态数据时,以文件路径作为唯一键,将处理结果进行记录,当任务失败再次执行时,会跳过已处理过的文件。

import ray
import pandas as pd

ray.init()

from tosfs.core import TosFileSystem
fs = TosFileSystem(
    endpoint_url="https://tos-cn-beijing.ivolces.com",
    key='xxx',
    secret='yyy==',
    region="cn-beijing"
)

ds = ray.data.read_images("tos://bucket_name/image_data/", include_paths=True, filesystem=fs)

dest_path = "tos://bucket_name/dest_data/"

checkpoint_config = {
    "enabled": True,
    "num_buckets": 2,
    "key_columns": "path",
    "storage": {
        "type":"fs",
        "check_ready_timeout": 300,
        "fs_options": {
            "path": "tos://bucket_name/checkpointdata/",
            "key": "xxx",
            "secret": "yyy==",
            "endpoint_url": 'http://tos-cn-beijing.ivolces.com',
            "region": 'cn-beijing',
            "max_items_per_file": 10000,
        },
        "remote_args": {
            "num_cpus":1,
        },
    }
  }
# 写入1234567行数据
ds.limit(1234567).write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)
# 写入剩下的数据
ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)
# 不会写入任务数据,因为前面已经全部处理完毕
ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)