You need to enable JavaScript to run this app.
文档中心
E-MapReduce

E-MapReduce

复制全文
下载 pdf
RayData 使用
Ray Data 文件写出两阶段写入
复制全文
下载 pdf
Ray Data 文件写出两阶段写入

EMR Ray 在 Ray Data 文件类写出能力中新增 两阶段写入(Two-phase write) 选项。启用后,Ray Data 会先将数据写入目标路径下的临时目录,待写入全部完成后再将文件移动到最终目录,从而提升文件写出的可靠性。本文为您提供 两阶段写入 功能的功能逻辑和使用示例。

背景信息

在使用 Ray Data 进行离线数据处理时,常见输出方式是将结果写入对象存储或分布式文件系统(例如 S3/TOS、HDFS、并行文件系统等),作为后续计算或训练的输入。
在生产环境中,写出链路通常会面临以下问题:

  • 作业运行过程中可能发生失败、重试或中断(节点故障、资源回收、网络抖动等),导致目标目录中出现不完整文件写入到一半的文件,下游任务可能误读。
  • 当采用覆盖写(mode=SaveMode.OVERWRITE)时,传统逻辑往往是“先删除旧数据,再写入新数据”。一旦写入过程失败或耗时较长,会出现目标路径在一段时间内没有可用数据数据不完整的窗口期。

功能说明

EMR Ray 在 Ray Data 文件类写出能力中新增 两阶段写入(Two-phase write) 选项。启用后,Ray Data 会先将数据写入目标路径下的临时目录,待写入全部完成后再将文件移动到最终目录,从而提升文件写出的可靠性。
该能力以 Python API 参数 two_phase_write 的形式提供,适用于 Ray Data 的文件类写出接口,例如 Dataset.write_parquetDataset.write_csvDataset.write_jsonDataset.write_images 等。

写出流程

启用 two_phase_write=True 后,Ray Data 的写出流程如下:

  • 阶段一:写入临时目录。在目标目录下创建一个临时 staging 目录(形如 .tmp_staging_dir_<uuid>),所有输出文件先写入该临时目录。
  • 阶段二:提交到最终目录。写入完成后,将临时目录中的文件逐个移动到最终目标路径,并清理临时目录。

在覆盖写(mode=SaveMode.OVERWRITE)场景下,两阶段写入会将“删除旧数据”的动作延后到提交阶段执行,避免“先删后写”带来的数据空窗期,从而降低覆盖写过程对下游读取的影响。

说明

two_phase_write 提供的是单文件级别的写入原子性:每个文件在被移动到最终目录前不会出现在目标位置。提交阶段采用逐文件移动方式,若在提交过程中发生异常,目标目录可能出现“部分文件已提交、部分未提交”的状态。

参数说明

参数

说明

默认值

two_phase_write

是否启用两阶段写入协议。启用后,数据先写入临时目录,再移动到最终目录,以提升写出过程的可靠性,并在一定程度上改善覆盖写场景的可用性。

False

注意事项
  • two_phase_write 默认关闭,需要在写出接口中显式开启。
  • 启用后会在目标路径下创建临时目录(.tmp_staging_dir_<uuid>)。正常完成会自动清理;若作业异常中断,可能残留临时目录,可在确认无任务使用后手动清理。
  • 对部分远程文件系统,move 操作可能以“拷贝 + 删除”方式实现,会带来额外 IO 开销,整体写入耗时可能增加。建议在关键链路(覆盖写、下游强依赖写入一致性)中评估后启用。

使用示例

示例1:覆盖写使用示例

#!/usr/bin/env python3
"""Ray Data Two-Phase Write to TOS Example

Use Ray Data with tosfs to write data to TOS (Tinder Object Storage)
with two-phase write for atomicity.
"""

import ray
import ray.data as rd
from ray.data import SaveMode
from tosfs.core import TosFileSystem

# Initialize TOS filesystem
fs = TosFileSystem(
    endpoint_url="https://tos-cn-beijing.ivolces.com",
    key='your-access-key',        # Replace with your access key
    secret='your-secret-key',     # Replace with your secret key
    region="cn-beijing"
)

# Initialize Ray
ray.init()

# Create sample data
data = [
    {"id": 1, "name": "Alice", "score": 95.5},
    {"id": 2, "name": "Bob", "score": 87.0},
    {"id": 3, "name": "Charlie", "score": 92.5},
]
ds = rd.from_items(data)

# Write to TOS with two-phase write
tos_path = "tos://your-bucket/output/data"

# Write to CSV
ds.write_csv(tos_path, filesystem=fs, mode=SaveMode.OVERWRITE, two_phase_write=True)

# Write to Parquet
# ds.write_parquet(tos_path, filesystem=fs, two_phase_write=True)

# Read back and verify
read_ds = rd.read_csv(tos_path, filesystem=fs)
print(f"Total rows written to TOS: {read_ds.count()}")

ray.shutdown()

示例2:追加写使用示例

#!/usr/bin/env python3
"""Ray Data Two-Phase Write Example

Use two-phase write to ensure atomic data writing.
Data is first written to a temporary directory and then moved to the final location.
"""

import ray
import ray.data as rd

ray.init()

data = [
    {"id": 1, "name": "Alice", "score": 95.5},
    {"id": 2, "name": "Bob", "score": 87.0},
    {"id": 3, "name": "Charlie", "score": 92.5},
]
ds = rd.from_items(data)

ds.write_csv("/tmp/output/csv", two_phase_write=True)
ds.write_json("/tmp/output/json", two_phase_write=True)
ds.write_parquet("/tmp/output/parquet", two_phase_write=True)

print(f"CSV rows: {rd.read_csv('/tmp/output/csv').count()}")
print(f"JSON rows: {rd.read_json('/tmp/output/json').count()}")
print(f"Parquet rows: {rd.read_parquet('/tmp/output/parquet').count()}")

ray.shutdown()
最近更新时间:2026.04.17 15:32:56
这个页面对您有帮助吗?
有用
有用
无用
无用