You need to enable JavaScript to run this app.
导航
使用 Ray 操作 Lance 数据
最近更新时间:2025.06.12 16:52:57首次发布时间:2024.11.14 14:20:40
我的收藏
有用
有用
无用
无用

前期准备

该功能需安装火山引擎提供的Ray 2.36.0 版本。

样例操作

说明

访问TOS的配置项,请参考文档使用Lance Python SDK访问TOS上的Lance数据

写入数据

import ray
from lance.ray.sink import LanceDatasink

ray.init()

sink = LanceDatasink(
       uri="s3://{PATH}",
       storage_options={
         "access_key_id": "{AK}",
         "secret_access_key": "{SK}",
         "aws_endpoint": "https://bucket1.tos-s3-cn-shanghai.ivolces.com",
         "virtual_hosted_style_request": "true"
       }
)
ray.data.range(10).map(
    lambda x: {"id": x["id"], "str": f"str-{x['id']}"}
).write_datasink(sink)

使用Ray进行Lance数据写入,已经封装为LanceDatasink,可以通过构建LanceDatasink对象,调度write_datasink完成写入。
LanceDatasink参数说明

参数

含义

样例

uri

数据集的路径

对象地址使用"s3://{PATH}",本地地址用"./db"

schema(可选)

数据集结构

默认为空,传递pyarrow.Schema格式

mode(可选)

数据集模式

有三种: "create"、"append"、"overwrite",默认为create,表示创建数据集

max_rows_per_file(可选)

多少行写出一个lance文件

默认1M行。如果发现lance文件过大,可以调低该数值

storage_options

对象存储的配置

参考使用Lance Python SDK访问TOS上的Lance数据

读取数据

import ray
ds = ray.data.read_lance( 
    uri="s3://hzw/tpcds/backup/store_sales_10g.lance",
    storage_options={
             "access_key_id": "{AK}",
             "secret_access_key": "{SK}",
             "aws_region": "shanghai",
             "aws_endpoint": "https://bucket1.tos-s3-cn-shanghai.ivolces.com",
             "virtual_hosted_style_request": "true"
        },
    concurrency=1
)
ds.take(10)

参数说明

参数

含义

样例

uri

数据集的路径

对象地址使用"s3://{PATH}",本地地址用"./db"

columns

只读取lance数据中的某些列

默认是读取全部列

filter

过滤条件

filter谓词会下压到lance,减少扫描数据量

storage_options

对象存储的配置

参考使用Lance Python SDK访问TOS上的Lance数据

scanner_options

扫描的选项,详情请参考官网文档

可以在这里面设置batch_size

concurrency

并发度

读取lance数据的并发数

override_num_blocks

数据块的数量

控制数据源读取时输出数据块的数量。具体来说,它指定了在读取数据时生成的输出块的数量,这等同于创建的读取任务数

Parquet格式转Lance样例

import ray
from lance.ray.sink import LanceDatasink
import pyarrow.fs as fs
ray.init()
s3 = fs.S3FileSystem(
    access_key='{AK}',
    secret_key='{SK}',
    region='cn-beijing',
    endpoint_override='https://tos-s3-cn-beijing.volces.com',
    force_virtual_addressing=True
)

ds = ray.data.read_parquet("s3://bucket1/{parquet_PATH}/", filesystem=s3)

sink = LanceDatasink(    
    uri="s3://{PATH}",
    storage_options={
        "access_key_id": "{AK}",
        "secret_access_key": "{SK}",
        "aws_region": "cn-beijing",
        "aws_endpoint": "https://bucket1.tos-s3-cn-shanghai.ivolces.com",
        "virtual_hosted_style_request": "true"
    }, 
    data_storage_version='stable'
)
ds.write_datasink(sink, concurrency=10)

注意

访问TOS需要传入Arrow的filesystem,fs.S3FileSystem的配置项语句和LanceDatasink的配置项语句如下有差异,endpoint_override中不可加bucket前缀,而lance的aws_endpoint中必须加bucket前缀。