You need to enable JavaScript to run this app.
文档中心
E-MapReduce

E-MapReduce

复制全文
下载 pdf
生态与集成
使用 Ray 操作 Lance 数据
复制全文
下载 pdf
使用 Ray 操作 Lance 数据

前期准备

该功能需安装火山引擎提供的Ray 2.46.0 及以上版本。

样例操作

说明

访问TOS的配置项,请参考文档使用Lance Python SDK访问TOS上的Lance数据

写入数据

新版本写入

说明

本操作适用于2.46.0及之后版本。

新版本中write_lance 方法已集成至 ray.data.Dataset 对象,成为其内置方法。

  • 旧实现:需从 lance.ray.sink 模块导入 LanceDatasink 类,并手动实例化后使用;
  • 新实现:无需额外导入或实例化操作,直接通过 ray.data.Dataset 对象调用即可。
import ray

# 初始化 Ray
ray.init()

# 创建 Ray Dataset
ds = ray.data.range(10).map(
    lambda x: {"id": x["id"], "str": f"str-{x['id']}"}
)

# 使用 write_lance 方法将数据写入 Lance 格式
ds.write_lance(
    path="s3://{PATH}",
    storage_options={
        "access_key_id": "{用户实际的AK}",
        "secret_access_key": "{用户实际的SK}",
        "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com",
        "virtual_hosted_style_request": "true"
    },
    mode='create',
    max_rows_per_file=8192
)

print("数据已成功写入 Lance 格式。")

Ray的Lance写入, 已经封装为了LanceDatasink, 可以通过构建LanceDatasink对象, 调度write_datasink完成写入
write_lance参数

参数

含义

样例

path

数据集的路径

对象地址使用"s3://{PATH}", 本地地址用"./db"

schema

数据集结构

可选参数, 默认为空, 传递pyarrow.Schema格式

mode

数据集模式

可选参数, 有三种: "create", "append", "overwrite", 默认为create, 表示创建数据集

max_rows_per_file

多少行写出一个lance文件

可选参数, 默认1M行. 如果发现lance文件过大, 可以调低该数值

storage_options

对象存储的配置

参考TOS访问配置

旧版本写入

说明

本操作适用于2.44.0及之前版本。

import ray
from lance.ray.sink import LanceDatasink

ray.init()

sink = LanceDatasink(
       uri="s3://{PATH}",
       storage_options={
         "access_key_id": "{用户实际的AK}",
         "secret_access_key": "{用户实际的SK}",
         "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com",
         "virtual_hosted_style_request": "true"
       }
)
ray.data.range(10).map(
    lambda x: {"id": x["id"], "str": f"str-{x['id']}"}
).write_datasink(sink)

使用Ray进行Lance数据写入,已经封装为LanceDatasink,可以通过构建LanceDatasink对象,调度write_datasink完成写入。
LanceDatasink参数说明

参数

含义

样例

uri

数据集的路径

对象地址使用"s3://{PATH}",本地地址用"./db"

schema(可选)

数据集结构

默认为空,传递pyarrow.Schema格式

mode(可选)

数据集模式

有三种: "create"、"append"、"overwrite",默认为create,表示创建数据集

max_rows_per_file(可选)

多少行写出一个lance文件

默认1M行。如果发现lance文件过大,可以调低该数值

storage_options

对象存储的配置

参考使用Lance Python SDK访问TOS上的Lance数据

读取数据

import ray
ds = ray.data.read_lance( 
    uri="s3://hzw/tpcds/backup/store_sales_10g.lance",
    storage_options={
             "access_key_id": "{用户实际的AK}",
             "secret_access_key": "{用户实际的SK}",
             "aws_region": "shanghai",
             "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com",
             "virtual_hosted_style_request": "true"
        },
    concurrency=1
)
ds.take(10)

参数说明

参数

含义

样例

uri

数据集的路径

对象地址使用"s3://{PATH}",本地地址用"./db"

columns

只读取lance数据中的某些列

默认是读取全部列

filter

过滤条件

filter谓词会下压到lance,减少扫描数据量

storage_options

对象存储的配置

参考使用Lance Python SDK访问TOS上的Lance数据

scanner_options

扫描的选项

可以在这里面设置batch_size

concurrency

并发度

读取lance数据的并发数

override_num_blocks

数据块的数量

控制数据源读取时输出数据块的数量。具体来说,它指定了在读取数据时生成的输出块的数量,这等同于创建的读取任务数

Parquet 格式转 Lance 样例

使用前需确认已安装 pyarrow 和 s3fs,便于使用 ray 读取 s3。如不确定,请参考 Ray 的使用文档
华北2为例:

  • 如在火山引擎的ECS上运行,Endpoint的地址请使用: https://bucket1.tos-s3-cn-beijing.ivolces.com。
  • 如在本地的机器上运行,使用公网地址: https://bucket1.tos-s3-cn-beijing.volces.com。

说明

其中ivolces代表内网访问, volces代表从公网访问。

import ray
import pyarrow.fs as fs
# 初始化 Ray
ray.init()
# 1. 配置 PyArrow FileSystem 用于读取数据
#    注意:这里的 endpoint 和配置用于读取操作
read_fs = fs.S3FileSystem(
    access_key='{用户实际的AK}',
    secret_key='{用户实际的SK}',
    region='cn-beijing',
    endpoint_override='https://{S3 Endpoint}.volces.com',
    force_virtual_addressing=True
)
# 2. 从 S3 读取 Parquet 数据
ds = ray.data.read_parquet("s3://bucket1/{parquet_PATH}/", filesystem=read_fs)

# 3. 使用 write_lance 直接写入数据
#    原 LanceDatasink 的参数被直接传入 write_lance 方法
ds.write_lance(
    path="s3://{PATH}",
    storage_options={
        "access_key_id": "{用户实际的AK}",
        "secret_access_key": "{用户实际的SK}",
        "aws_region": "cn-beijing",
        # 注意:这里的 endpoint 和配置用于写入操作
        "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.volces.com",
        "virtual_hosted_style_request": "true"
    }, 
     max_concurrent_tasks=10  # 'concurrency' 参数现在由 'max_concurrent_tasks' 替代
)

print("数据已成功转换为 Lance 格式并写入目标位置!")

注意

访问TOS需要传入Arrow的filesystem,fs.S3FileSystem的配置项语句和LanceDatasink的配置项语句如下有差异,endpoint_override中不可加bucket前缀,而lance的aws_endpoint中必须加bucket前缀。

最近更新时间:2025.12.09 15:29:37
这个页面对您有帮助吗?
有用
有用
无用
无用