在多模态数据(如图像、文本、音频、视频等)的预处理流程中,通常会涉及使用 GPU 进行数据处理和推理。然而,这类处理流程通常具有以下特点:
上述处理流程一旦在中间环节失败,重新启动往往导致整个流程必须从头开始运行,这不可避免地造成先前已成功处理的数据被重复执行。这种重复处理不仅是对计算资源的巨大浪费——消耗了宝贵的GPU/CPU运算时间、增加了不必要的资源开销,也意味着宝贵的时间被无效占用。更重要的是,它会显著拖慢整个下游任务(例如关键的模型训练或数据评测阶段)的进度,形成一种雪球效应,使得整个数据处理链条的效率大幅降低,资源浪费和时间延误问题亟待解决。
针对以上问题,EMR在多模态数据预处理链路中研发了Checkpoint功能。Checkpoint功能是一种能够记录数据处理进度、保存已完成任务状态的机制。该机制借助对“已成功处理的数据”进行持久化存储,达成了“失败后恢复时跳过已完成工作,仅处理未完成部分”的核心能力,尤其在依赖GPU开展高成本多模态数据处理(如图像增强、视频帧提取、跨模态特征对齐等)的场景中具备重要意义。其核心价值体现为显著节约计算资源和时间成本,增强处理流程的可靠性与鲁棒性,提高开发与研究效率。
通过Ray Data输出时设置checkpoint相关配置,在数据处理过程中,已处理的数据将自动记录至指定的检查点位置。当任务失败并重启时,在读取(Read)阶段会自动过滤已处理的数据,仅未处理的数据将进入后续处理流程。
checkpoint_config = { "enabled": True, "num_buckets": 2, "key_columns": "id", "storage": { "type":"fs", "check_ready_timeout": 300, "fs_options": { "path": "tos://bucket_name/checkpointdata/", "key": "xxx", "secret": "yyy==", "endpoint_url": 'http://tos-cn-beijing.ivolces.com', "region": 'cn-beijing', "max_items_per_file": 10000, }, "remote_args": { "num_cpus":1, }, } } ds.write_parquet(dest_path, checkpoint_config=checkpoint_config)
支持的数据源:任意数据源。可以是文件如图片等,也可以是数据文件如json等,也可以是关系型数据库如sql等
支持的输出格式:支持csv, json, parquet等文件输出格式。
参数 | 说明 | 默认值 |
|---|---|---|
enabled | 是否启用checkpoint功能 | False |
num_buckets | 处理checkpoint的Actor数量,数据将按照分桶数(num_buckets)进行分桶处理,每个桶仅处理与之对应的一部分数据。在处理大规模数据时,每个Actor将分摊一部分数据进行处理。 | 1 |
key_columns | 数据中的唯一键的列名 | 无 |
storage.type | 目前固定为fs,代表fileSystem存储后端 | fs |
storage.check_ready_timeout | 当任务重新时,加载Checkpoint信息的超时时间,以秒为单位,float类型,根据处理的数据量设置。500万long型数据加载约1min, | 500 |
storage.fs_options.path | 存储检查点(checkpoint)信息的文件路径,可以是本地路径,也可以是对象存储服务(TOS)路径。当为本地路径时,需要确保所有节点能够访问到相同路径,例如挂载的为火山引擎并行文件系统(vePFS)。 | 无 |
storage.fs_options.max_items_per_file | 单个checkpoint文件保存的最大数据条数 | 100000 |
storage.fs_options下的其他配置 | storage.fs_options下的其他配置可以配置TosFileSystem相关的配置,支持key,secret,endpoint_url,region等 | 无 |
storage.remote_args.x | 处理checkpoint的Actor所使用的资源。设置参数需遵循Ray的Actor资源设置模式。x可以为num_cpus等。 |
数据量级(key_column的数据量) | 推荐参数配置 | 说明 |
|---|---|---|
512M | num_buckets = 4 | 当唯一键为 long 型时,可支撑的数据条数约为 1.7亿。对于其他类型,可按照单条数据大小进行计算。 |
1G | num_buckets = 8 | 支撑的数据条数计算公式同上 |
1G以上 | 同比例配置 |
处理以Row为粒度的多模态数据时,数据中的唯一键(如id)会被记录至检查点(checkpoint)中,当任务失败后再次执行时,会跳过已处理过的id。
import ray import pandas as pd ray.init() from tosfs.core import TosFileSystem fs = TosFileSystem( endpoint_url="https://tos-cn-beijing.ivolces.com", key='xxx', secret='yyy==', region="cn-beijing" ) ds = ray.data.read_parquet("tos://bucket_name/source_data/", filesystem=fs) dest_path = "tos://bucket_name/dest_data/" checkpoint_config = { "enabled": True, "num_buckets": 2, "key_columns": "id", "storage": { "type":"fs", "check_ready_timeout": 300, "fs_options": { "path": "tos://bucket_name/checkpointdata/", "key": "xxx", "secret": "yyy==", "endpoint_url": 'http://tos-cn-beijing.ivolces.com', "region": 'cn-beijing', "max_items_per_file": 10000, }, "remote_args": { "num_cpus":1, }, } } # 写入1234567行数据 ds.limit(1234567).write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs) # 写入剩下的数据 ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs) # 不会写入任务数据,因为前面已经全部处理完毕 ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)
处理以文件为粒度的多模态数据时,以文件路径作为唯一键,将处理结果进行记录,当任务失败再次执行时,会跳过已处理过的文件。
import ray import pandas as pd ray.init() from tosfs.core import TosFileSystem fs = TosFileSystem( endpoint_url="https://tos-cn-beijing.ivolces.com", key='xxx', secret='yyy==', region="cn-beijing" ) ds = ray.data.read_images("tos://bucket_name/image_data/", include_paths=True, filesystem=fs) dest_path = "tos://bucket_name/dest_data/" checkpoint_config = { "enabled": True, "num_buckets": 2, "key_columns": "path", "storage": { "type":"fs", "check_ready_timeout": 300, "fs_options": { "path": "tos://bucket_name/checkpointdata/", "key": "xxx", "secret": "yyy==", "endpoint_url": 'http://tos-cn-beijing.ivolces.com', "region": 'cn-beijing', "max_items_per_file": 10000, }, "remote_args": { "num_cpus":1, }, } } # 写入1234567行数据 ds.limit(1234567).write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs) # 写入剩下的数据 ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs) # 不会写入任务数据,因为前面已经全部处理完毕 ds.write_parquet(dest_path, checkpoint_config=checkpoint_config, filesystem=fs)