You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Ray中基于write_parquet实现VLLM推理结果流式写入的方案咨询

Ray中基于write_parquet实现VLLM推理结果流式写入的方案咨询

你提的这个需求特别贴合大模型推理的实际场景——既要榨干硬件资源,又要避免内存爆仓,还能早点拿到部分结果,Ray完全支持这类流式处理的需求,我来给你拆解几个可行的方案:

一、优先用Ray Dataset内置的流式写入(最简单高效)

Ray Dataset的write_parquet本身就支持边处理边写入的流式模式,只要开启对应的参数,就能让推理完成的批次立刻写入磁盘,不用等全量任务结束。

调整后的代码示例:

ds = ray.data.read_parquet(
    my_input_path,
    dataset_options={"streaming": True}  # 读取阶段就开启流式,减少内存缓存
)
ds = ds.map_batches(
    VLLMPredictor,
    concurrency=ray_concurrency,
    compute=ray.data.ActorPoolStrategy(size=ray_concurrency),  # 复用VLLM Actor,减少初始化开销
    **resources_kwarg
)
ds.write_parquet(
    my_output_path,
    streaming=True,  # 核心参数:开启流式写入
    block_size="512MB",  # 每积累512MB结果就写入一个文件
    num_files=0  # 自动根据block_size拆分文件,避免单个超大文件
)

这个方案的好处是完全利用Ray的内置流水线,GPU处理完一批结果后,CPU会立刻接手写入操作,不会让CPU闲置;同时处理完的结果会及时落地磁盘,不会一直占着内存/对象存储,从根本上避免OOM。

二、手动控制分批写入(适合精细化需求)

如果需要更精确的控制(比如每处理N批就写入一次),可以用iter_batches遍历处理后的数据集,手动攒批写入:

代码示例:

ds = ray.data.read_parquet(
    my_input_path,
    dataset_options={"streaming": True}
)
processed_ds = ds.map_batches(
    VLLMPredictor,
    concurrency=ray_concurrency,
    compute=ray.data.ActorPoolStrategy(size=ray_concurrency),
    **resources_kwarg
)

batch_counter = 0
target_batch_count = 10  # 每处理10批就写入一次
temp_batches = []

# 遍历处理后的批次
for batch in processed_ds.iter_batches(batch_size=1024):  # batch_size是单批的行数
    temp_batches.append(batch)
    batch_counter += 1
    if batch_counter >= target_batch_count:
        # 将攒好的批次合并成Dataset写入磁盘
        ray.data.from_items(temp_batches).write_parquet(
            f"{my_output_path}/batch_{batch_counter//target_batch_count}",
            overwrite=True
        )
        temp_batches = []  # 清空临时列表,准备下一轮攒批

# 处理最后剩余的批次
if temp_batches:
    ray.data.from_items(temp_batches).write_parquet(
        f"{my_output_path}/batch_final",
        overwrite=True
    )

这种方式会生成多个子目录的Parquet文件,后续如果需要合并可以用ray.data.read_parquet(my_output_path)直接读取所有文件,不影响后续使用。

三、解决你日志里的对象存储积压问题

你提到日志里“Processed prompts”一直在更新,但对象存储进度没变化,这是因为默认情况下Ray Dataset会把所有处理后的结果缓存到对象存储里,直到全量处理完成才写入磁盘。开启streaming=True后,处理完的批次会立刻落地磁盘,释放对象存储空间,这时对象存储的使用率会动态变化,不会一直积压。

另外可以在初始化Ray时,根据节点配置调整object_store_memory的大小,确保对象存储有足够的缓冲空间,但不用设得过大——毕竟流式写入会及时释放空间。

总结

  • 优先用内置流式写入方案,代码改动最小,性能最优;
  • 如果需要精细化控制写入时机,再用手动分批写入
  • 开启流式读取和写入后,CPU会和GPU形成流水线,不会闲置,同时彻底避免大结果集的内存占用问题,还能早点看到部分推理结果。

备注:内容来源于stack exchange,提问作者cnmdestroyer

火山引擎 最新活动