You need to enable JavaScript to run this app.
导航
使用 Ray 操作 Lance 数据
最近更新时间:2025.11.13 20:17:43首次发布时间:2024.11.14 14:20:40
复制全文
我的收藏
有用
有用
无用
无用

前期准备

该功能需安装火山引擎提供的Ray 2.46.0 及以上版本。

样例操作

说明

访问TOS的配置项,请参考文档使用Lance Python SDK访问TOS上的Lance数据

写入数据

新版本写入

说明

本操作适用于2.46.0及之后版本。

新版本中write_lance 方法已集成至 ray.data.Dataset 对象,成为其内置方法。

  • 旧实现:需从 lance.ray.sink 模块导入 LanceDatasink 类,并手动实例化后使用;
  • 新实现:无需额外导入或实例化操作,直接通过 ray.data.Dataset 对象调用即可。
import ray

# 初始化 Ray
ray.init()

# 创建 Ray Dataset
ds = ray.data.range(10).map(
    lambda x: {"id": x["id"], "str": f"str-{x['id']}"}
)

# 使用 write_lance 方法将数据写入 Lance 格式
ds.write_lance(
    path="s3://{PATH}",
    storage_options={
        "access_key_id": "{用户实际的AK}",
        "secret_access_key": "{用户实际的SK}",
        "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com",
        "virtual_hosted_style_request": "true"
    },
    mode='create',
    max_rows_per_file=8192
)

print("数据已成功写入 Lance 格式。")

Ray的Lance写入, 已经封装为了LanceDatasink, 可以通过构建LanceDatasink对象, 调度write_datasink完成写入
write_lance参数

参数

含义

样例

path

数据集的路径

对象地址使用"s3://{PATH}", 本地地址用"./db"

schema

数据集结构

可选参数, 默认为空, 传递pyarrow.Schema格式

mode

数据集模式

可选参数, 有三种: "create", "append", "overwrite", 默认为create, 表示创建数据集

max_rows_per_file

多少行写出一个lance文件

可选参数, 默认1M行. 如果发现lance文件过大, 可以调低该数值

storage_options

对象存储的配置

参考TOS访问配置

旧版本写入

说明

本操作适用于2.44.0及之前版本。

import ray
from lance.ray.sink import LanceDatasink

ray.init()

sink = LanceDatasink(
       uri="s3://{PATH}",
       storage_options={
         "access_key_id": "{用户实际的AK}",
         "secret_access_key": "{用户实际的SK}",
         "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com",
         "virtual_hosted_style_request": "true"
       }
)
ray.data.range(10).map(
    lambda x: {"id": x["id"], "str": f"str-{x['id']}"}
).write_datasink(sink)

使用Ray进行Lance数据写入,已经封装为LanceDatasink,可以通过构建LanceDatasink对象,调度write_datasink完成写入。
LanceDatasink参数说明

参数

含义

样例

uri

数据集的路径

对象地址使用"s3://{PATH}",本地地址用"./db"

schema(可选)

数据集结构

默认为空,传递pyarrow.Schema格式

mode(可选)

数据集模式

有三种: "create"、"append"、"overwrite",默认为create,表示创建数据集

max_rows_per_file(可选)

多少行写出一个lance文件

默认1M行。如果发现lance文件过大,可以调低该数值

storage_options

对象存储的配置

参考使用Lance Python SDK访问TOS上的Lance数据

读取数据

import ray
ds = ray.data.read_lance( 
    uri="s3://hzw/tpcds/backup/store_sales_10g.lance",
    storage_options={
             "access_key_id": "{用户实际的AK}",
             "secret_access_key": "{用户实际的SK}",
             "aws_region": "shanghai",
             "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com",
             "virtual_hosted_style_request": "true"
        },
    concurrency=1
)
ds.take(10)

参数说明

参数

含义

样例

uri

数据集的路径

对象地址使用"s3://{PATH}",本地地址用"./db"

columns

只读取lance数据中的某些列

默认是读取全部列

filter

过滤条件

filter谓词会下压到lance,减少扫描数据量

storage_options

对象存储的配置

参考使用Lance Python SDK访问TOS上的Lance数据

scanner_options

扫描的选项,详情请参考官网文档

可以在这里面设置batch_size

concurrency

并发度

读取lance数据的并发数

override_num_blocks

数据块的数量

控制数据源读取时输出数据块的数量。具体来说,它指定了在读取数据时生成的输出块的数量,这等同于创建的读取任务数

Parquet 格式转 Lance 样例

使用前需确认已安装 pyarrow 和 s3fs,便于使用 ray 读取 s3。如不确定,请参考 Ray 的使用文档
华北2为例:

  • 如在火山引擎的ECS上运行,Endpoint的地址请使用: https://bucket1.tos-s3-cn-beijing.ivolces.com。
  • 如在本地的机器上运行,使用公网地址: https://bucket1.tos-s3-cn-beijing.volces.com。

说明

其中ivolces代表内网访问, volces代表从公网访问。

import ray
import pyarrow.fs as fs
# 初始化 Ray
ray.init()
# 1. 配置 PyArrow FileSystem 用于读取数据
#    注意:这里的 endpoint 和配置用于读取操作
read_fs = fs.S3FileSystem(
    access_key='{用户实际的AK}',
    secret_key='{用户实际的SK}',
    region='cn-beijing',
    endpoint_override='https://{S3 Endpoint}.volces.com',
    force_virtual_addressing=True
)
# 2. 从 S3 读取 Parquet 数据
ds = ray.data.read_parquet("s3://bucket1/{parquet_PATH}/", filesystem=read_fs)

# 3. 使用 write_lance 直接写入数据
#    原 LanceDatasink 的参数被直接传入 write_lance 方法
ds.write_lance(
    path="s3://{PATH}",
    storage_options={
        "access_key_id": "{用户实际的AK}",
        "secret_access_key": "{用户实际的SK}",
        "aws_region": "cn-beijing",
        # 注意:这里的 endpoint 和配置用于写入操作
        "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.volces.com",
        "virtual_hosted_style_request": "true"
    }, 
     max_concurrent_tasks=10  # 'concurrency' 参数现在由 'max_concurrent_tasks' 替代
)

print("数据已成功转换为 Lance 格式并写入目标位置!")

注意

访问TOS需要传入Arrow的filesystem,fs.S3FileSystem的配置项语句和LanceDatasink的配置项语句如下有差异,endpoint_override中不可加bucket前缀,而lance的aws_endpoint中必须加bucket前缀。