该功能需安装火山引擎提供的Ray 2.46.0 及以上版本。
说明
访问TOS的配置项,请参考文档使用Lance Python SDK访问TOS上的Lance数据。
说明
本操作适用于2.46.0及之后版本。
新版本中write_lance 方法已集成至 ray.data.Dataset 对象,成为其内置方法。
lance.ray.sink 模块导入 LanceDatasink 类,并手动实例化后使用;ray.data.Dataset 对象调用即可。import ray # 初始化 Ray ray.init() # 创建 Ray Dataset ds = ray.data.range(10).map( lambda x: {"id": x["id"], "str": f"str-{x['id']}"} ) # 使用 write_lance 方法将数据写入 Lance 格式 ds.write_lance( path="s3://{PATH}", storage_options={ "access_key_id": "{用户实际的AK}", "secret_access_key": "{用户实际的SK}", "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com", "virtual_hosted_style_request": "true" }, mode='create', max_rows_per_file=8192 ) print("数据已成功写入 Lance 格式。")
Ray的Lance写入, 已经封装为了LanceDatasink, 可以通过构建LanceDatasink对象, 调度write_datasink完成写入
write_lance参数
参数 | 含义 | 样例 |
|---|---|---|
path | 数据集的路径 | 对象地址使用"s3://{PATH}", 本地地址用"./db" |
schema | 数据集结构 | 可选参数, 默认为空, 传递pyarrow.Schema格式 |
mode | 数据集模式 | 可选参数, 有三种: "create", "append", "overwrite", 默认为create, 表示创建数据集 |
max_rows_per_file | 多少行写出一个lance文件 | 可选参数, 默认1M行. 如果发现lance文件过大, 可以调低该数值 |
storage_options | 对象存储的配置 | 参考TOS访问配置 |
说明
本操作适用于2.44.0及之前版本。
import ray from lance.ray.sink import LanceDatasink ray.init() sink = LanceDatasink( uri="s3://{PATH}", storage_options={ "access_key_id": "{用户实际的AK}", "secret_access_key": "{用户实际的SK}", "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com", "virtual_hosted_style_request": "true" } ) ray.data.range(10).map( lambda x: {"id": x["id"], "str": f"str-{x['id']}"} ).write_datasink(sink)
使用Ray进行Lance数据写入,已经封装为LanceDatasink,可以通过构建LanceDatasink对象,调度write_datasink完成写入。
LanceDatasink参数说明
参数 | 含义 | 样例 |
|---|---|---|
uri | 数据集的路径 | 对象地址使用"s3://{PATH}",本地地址用"./db" |
schema(可选) | 数据集结构 | 默认为空,传递pyarrow.Schema格式 |
mode(可选) | 数据集模式 | 有三种: "create"、"append"、"overwrite",默认为create,表示创建数据集 |
max_rows_per_file(可选) | 多少行写出一个lance文件 | 默认1M行。如果发现lance文件过大,可以调低该数值 |
storage_options | 对象存储的配置 |
import ray ds = ray.data.read_lance( uri="s3://hzw/tpcds/backup/store_sales_10g.lance", storage_options={ "access_key_id": "{用户实际的AK}", "secret_access_key": "{用户实际的SK}", "aws_region": "shanghai", "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.ivolces.com", "virtual_hosted_style_request": "true" }, concurrency=1 ) ds.take(10)
参数说明
参数 | 含义 | 样例 |
|---|---|---|
uri | 数据集的路径 | 对象地址使用"s3://{PATH}",本地地址用"./db" |
columns | 只读取lance数据中的某些列 | 默认是读取全部列 |
filter | 过滤条件 | filter谓词会下压到lance,减少扫描数据量 |
storage_options | 对象存储的配置 | |
scanner_options | 扫描的选项,详情请参考官网文档 | 可以在这里面设置 |
concurrency | 并发度 | 读取lance数据的并发数 |
override_num_blocks | 数据块的数量 | 控制数据源读取时输出数据块的数量。具体来说,它指定了在读取数据时生成的输出块的数量,这等同于创建的读取任务数 |
使用前需确认已安装 pyarrow 和 s3fs,便于使用 ray 读取 s3。如不确定,请参考 Ray 的使用文档。
以华北2为例:
说明
其中ivolces代表内网访问, volces代表从公网访问。
import ray import pyarrow.fs as fs # 初始化 Ray ray.init() # 1. 配置 PyArrow FileSystem 用于读取数据 # 注意:这里的 endpoint 和配置用于读取操作 read_fs = fs.S3FileSystem( access_key='{用户实际的AK}', secret_key='{用户实际的SK}', region='cn-beijing', endpoint_override='https://{S3 Endpoint}.volces.com', force_virtual_addressing=True ) # 2. 从 S3 读取 Parquet 数据 ds = ray.data.read_parquet("s3://bucket1/{parquet_PATH}/", filesystem=read_fs) # 3. 使用 write_lance 直接写入数据 # 原 LanceDatasink 的参数被直接传入 write_lance 方法 ds.write_lance( path="s3://{PATH}", storage_options={ "access_key_id": "{用户实际的AK}", "secret_access_key": "{用户实际的SK}", "aws_region": "cn-beijing", # 注意:这里的 endpoint 和配置用于写入操作 "aws_endpoint": "https://{bucket name}.{S3 Endpoint}.volces.com", "virtual_hosted_style_request": "true" }, max_concurrent_tasks=10 # 'concurrency' 参数现在由 'max_concurrent_tasks' 替代 ) print("数据已成功转换为 Lance 格式并写入目标位置!")
注意
访问TOS需要传入Arrow的filesystem,fs.S3FileSystem的配置项语句和LanceDatasink的配置项语句如下有差异,endpoint_override中不可加bucket前缀,而lance的aws_endpoint中必须加bucket前缀。