该功能需安装火山引擎提供的Ray 2.36.0 版本。
说明
访问TOS的配置项,请参考文档使用Lance Python SDK访问TOS上的Lance数据。
import ray from lance.ray.sink import LanceDatasink ray.init() sink = LanceDatasink( uri="s3://{PATH}", storage_options={ "access_key_id": "{AK}", "secret_access_key": "{SK}", "aws_endpoint": "https://bucket1.tos-s3-cn-shanghai.ivolces.com", "virtual_hosted_style_request": "true" } ) ray.data.range(10).map( lambda x: {"id": x["id"], "str": f"str-{x['id']}"} ).write_datasink(sink)
使用Ray进行Lance数据写入,已经封装为LanceDatasink,可以通过构建LanceDatasink对象,调度write_datasink完成写入。
LanceDatasink参数说明
参数 | 含义 | 样例 |
---|---|---|
uri | 数据集的路径 | 对象地址使用"s3://{PATH}",本地地址用"./db" |
schema(可选) | 数据集结构 | 默认为空,传递pyarrow.Schema格式 |
mode(可选) | 数据集模式 | 有三种: "create"、"append"、"overwrite",默认为create,表示创建数据集 |
max_rows_per_file(可选) | 多少行写出一个lance文件 | 默认1M行。如果发现lance文件过大,可以调低该数值 |
storage_options | 对象存储的配置 |
import ray ds = ray.data.read_lance( uri="s3://hzw/tpcds/backup/store_sales_10g.lance", storage_options={ "access_key_id": "{AK}", "secret_access_key": "{SK}", "aws_region": "shanghai", "aws_endpoint": "https://bucket1.tos-s3-cn-shanghai.ivolces.com", "virtual_hosted_style_request": "true" }, concurrency=1 ) ds.take(10)
参数说明
参数 | 含义 | 样例 |
---|---|---|
uri | 数据集的路径 | 对象地址使用"s3://{PATH}",本地地址用"./db" |
columns | 只读取lance数据中的某些列 | 默认是读取全部列 |
filter | 过滤条件 | filter谓词会下压到lance,减少扫描数据量 |
storage_options | 对象存储的配置 | |
scanner_options | 扫描的选项,详情请参考官网文档 | 可以在这里面设置 |
concurrency | 并发度 | 读取lance数据的并发数 |
override_num_blocks | 数据块的数量 | 控制数据源读取时输出数据块的数量。具体来说,它指定了在读取数据时生成的输出块的数量,这等同于创建的读取任务数 |
import ray from lance.ray.sink import LanceDatasink import pyarrow.fs as fs ray.init() s3 = fs.S3FileSystem( access_key='{AK}', secret_key='{SK}', region='cn-beijing', endpoint_override='https://tos-s3-cn-beijing.volces.com', force_virtual_addressing=True ) ds = ray.data.read_parquet("s3://bucket1/{parquet_PATH}/", filesystem=s3) sink = LanceDatasink( uri="s3://{PATH}", storage_options={ "access_key_id": "{AK}", "secret_access_key": "{SK}", "aws_region": "cn-beijing", "aws_endpoint": "https://bucket1.tos-s3-cn-shanghai.ivolces.com", "virtual_hosted_style_request": "true" }, data_storage_version='stable' ) ds.write_datasink(sink, concurrency=10)
注意
访问TOS需要传入Arrow的filesystem,fs.S3FileSystem的配置项语句和LanceDatasink的配置项语句如下有差异,endpoint_override
中不可加bucket前缀,而lance的aws_endpoint
中必须加bucket前缀。