Ray中基于write_parquet实现VLLM推理结果流式写入的方案咨询
Ray中基于write_parquet实现VLLM推理结果流式写入的方案咨询
你提的这个需求特别贴合大模型推理的实际场景——既要榨干硬件资源,又要避免内存爆仓,还能早点拿到部分结果,Ray完全支持这类流式处理的需求,我来给你拆解几个可行的方案:
一、优先用Ray Dataset内置的流式写入(最简单高效)
Ray Dataset的write_parquet本身就支持边处理边写入的流式模式,只要开启对应的参数,就能让推理完成的批次立刻写入磁盘,不用等全量任务结束。
调整后的代码示例:
ds = ray.data.read_parquet( my_input_path, dataset_options={"streaming": True} # 读取阶段就开启流式,减少内存缓存 ) ds = ds.map_batches( VLLMPredictor, concurrency=ray_concurrency, compute=ray.data.ActorPoolStrategy(size=ray_concurrency), # 复用VLLM Actor,减少初始化开销 **resources_kwarg ) ds.write_parquet( my_output_path, streaming=True, # 核心参数:开启流式写入 block_size="512MB", # 每积累512MB结果就写入一个文件 num_files=0 # 自动根据block_size拆分文件,避免单个超大文件 )
这个方案的好处是完全利用Ray的内置流水线,GPU处理完一批结果后,CPU会立刻接手写入操作,不会让CPU闲置;同时处理完的结果会及时落地磁盘,不会一直占着内存/对象存储,从根本上避免OOM。
二、手动控制分批写入(适合精细化需求)
如果需要更精确的控制(比如每处理N批就写入一次),可以用iter_batches遍历处理后的数据集,手动攒批写入:
代码示例:
ds = ray.data.read_parquet( my_input_path, dataset_options={"streaming": True} ) processed_ds = ds.map_batches( VLLMPredictor, concurrency=ray_concurrency, compute=ray.data.ActorPoolStrategy(size=ray_concurrency), **resources_kwarg ) batch_counter = 0 target_batch_count = 10 # 每处理10批就写入一次 temp_batches = [] # 遍历处理后的批次 for batch in processed_ds.iter_batches(batch_size=1024): # batch_size是单批的行数 temp_batches.append(batch) batch_counter += 1 if batch_counter >= target_batch_count: # 将攒好的批次合并成Dataset写入磁盘 ray.data.from_items(temp_batches).write_parquet( f"{my_output_path}/batch_{batch_counter//target_batch_count}", overwrite=True ) temp_batches = [] # 清空临时列表,准备下一轮攒批 # 处理最后剩余的批次 if temp_batches: ray.data.from_items(temp_batches).write_parquet( f"{my_output_path}/batch_final", overwrite=True )
这种方式会生成多个子目录的Parquet文件,后续如果需要合并可以用ray.data.read_parquet(my_output_path)直接读取所有文件,不影响后续使用。
三、解决你日志里的对象存储积压问题
你提到日志里“Processed prompts”一直在更新,但对象存储进度没变化,这是因为默认情况下Ray Dataset会把所有处理后的结果缓存到对象存储里,直到全量处理完成才写入磁盘。开启streaming=True后,处理完的批次会立刻落地磁盘,释放对象存储空间,这时对象存储的使用率会动态变化,不会一直积压。
另外可以在初始化Ray时,根据节点配置调整object_store_memory的大小,确保对象存储有足够的缓冲空间,但不用设得过大——毕竟流式写入会及时释放空间。
总结
- 优先用内置流式写入方案,代码改动最小,性能最优;
- 如果需要精细化控制写入时机,再用手动分批写入;
- 开启流式读取和写入后,CPU会和GPU形成流水线,不会闲置,同时彻底避免大结果集的内存占用问题,还能早点看到部分推理结果。
备注:内容来源于stack exchange,提问作者cnmdestroyer




