EMR Ray 在 Ray Data 文件类写出能力中新增 两阶段写入(Two-phase write) 选项。启用后,Ray Data 会先将数据写入目标路径下的临时目录,待写入全部完成后再将文件移动到最终目录,从而提升文件写出的可靠性。本文为您提供 两阶段写入 功能的功能逻辑和使用示例。
在使用 Ray Data 进行离线数据处理时,常见输出方式是将结果写入对象存储或分布式文件系统(例如 S3/TOS、HDFS、并行文件系统等),作为后续计算或训练的输入。
在生产环境中,写出链路通常会面临以下问题:
mode=SaveMode.OVERWRITE)时,传统逻辑往往是“先删除旧数据,再写入新数据”。一旦写入过程失败或耗时较长,会出现目标路径在一段时间内没有可用数据或数据不完整的窗口期。EMR Ray 在 Ray Data 文件类写出能力中新增 两阶段写入(Two-phase write) 选项。启用后,Ray Data 会先将数据写入目标路径下的临时目录,待写入全部完成后再将文件移动到最终目录,从而提升文件写出的可靠性。
该能力以 Python API 参数 two_phase_write 的形式提供,适用于 Ray Data 的文件类写出接口,例如 Dataset.write_parquet、Dataset.write_csv、Dataset.write_json、Dataset.write_images 等。
启用 two_phase_write=True 后,Ray Data 的写出流程如下:
.tmp_staging_dir_<uuid>),所有输出文件先写入该临时目录。在覆盖写(mode=SaveMode.OVERWRITE)场景下,两阶段写入会将“删除旧数据”的动作延后到提交阶段执行,避免“先删后写”带来的数据空窗期,从而降低覆盖写过程对下游读取的影响。
说明
two_phase_write 提供的是单文件级别的写入原子性:每个文件在被移动到最终目录前不会出现在目标位置。提交阶段采用逐文件移动方式,若在提交过程中发生异常,目标目录可能出现“部分文件已提交、部分未提交”的状态。
参数 | 说明 | 默认值 |
|---|---|---|
| 是否启用两阶段写入协议。启用后,数据先写入临时目录,再移动到最终目录,以提升写出过程的可靠性,并在一定程度上改善覆盖写场景的可用性。 |
|
two_phase_write 默认关闭,需要在写出接口中显式开启。.tmp_staging_dir_<uuid>)。正常完成会自动清理;若作业异常中断,可能残留临时目录,可在确认无任务使用后手动清理。move 操作可能以“拷贝 + 删除”方式实现,会带来额外 IO 开销,整体写入耗时可能增加。建议在关键链路(覆盖写、下游强依赖写入一致性)中评估后启用。#!/usr/bin/env python3 """Ray Data Two-Phase Write to TOS Example Use Ray Data with tosfs to write data to TOS (Tinder Object Storage) with two-phase write for atomicity. """ import ray import ray.data as rd from ray.data import SaveMode from tosfs.core import TosFileSystem # Initialize TOS filesystem fs = TosFileSystem( endpoint_url="https://tos-cn-beijing.ivolces.com", key='your-access-key', # Replace with your access key secret='your-secret-key', # Replace with your secret key region="cn-beijing" ) # Initialize Ray ray.init() # Create sample data data = [ {"id": 1, "name": "Alice", "score": 95.5}, {"id": 2, "name": "Bob", "score": 87.0}, {"id": 3, "name": "Charlie", "score": 92.5}, ] ds = rd.from_items(data) # Write to TOS with two-phase write tos_path = "tos://your-bucket/output/data" # Write to CSV ds.write_csv(tos_path, filesystem=fs, mode=SaveMode.OVERWRITE, two_phase_write=True) # Write to Parquet # ds.write_parquet(tos_path, filesystem=fs, two_phase_write=True) # Read back and verify read_ds = rd.read_csv(tos_path, filesystem=fs) print(f"Total rows written to TOS: {read_ds.count()}") ray.shutdown()
#!/usr/bin/env python3 """Ray Data Two-Phase Write Example Use two-phase write to ensure atomic data writing. Data is first written to a temporary directory and then moved to the final location. """ import ray import ray.data as rd ray.init() data = [ {"id": 1, "name": "Alice", "score": 95.5}, {"id": 2, "name": "Bob", "score": 87.0}, {"id": 3, "name": "Charlie", "score": 92.5}, ] ds = rd.from_items(data) ds.write_csv("/tmp/output/csv", two_phase_write=True) ds.write_json("/tmp/output/json", two_phase_write=True) ds.write_parquet("/tmp/output/parquet", two_phase_write=True) print(f"CSV rows: {rd.read_csv('/tmp/output/csv').count()}") print(f"JSON rows: {rd.read_json('/tmp/output/json').count()}") print(f"Parquet rows: {rd.read_parquet('/tmp/output/parquet').count()}") ray.shutdown()