LAS平台具备数据集托管功能。用户可将自有数据托管于该平台,并借助平台提供的工具实现数据的预览与分析操作。
LAS的数据集托管采用半托管模式。在此模式下,用户无需将数据上传至LAS平台,仅需将数据留存于自身的TOS/vePFS系统中进行维护即可。这一模式有效保障了数据的安全性,消除了用户对数据泄露风险的担忧。
此外,LAS托管的数据集可直接利用平台提供的工具开展大规模计算处理,满足用户在数据运算方面的需求。
使用 LAS 平台提供的 ve-daft、开发机、工作流等工具,用户可以搭建一个完整的数据处理链路。关于这方面可以参考LAS作业开发。本文重点介绍如何使用 ve-daft 读写 LAS 的数据集。
目前 LAS 数据集支持以下几种类型:
说明
对于表类型,LAS 推荐使用 Lance 作为数据存储格式。该格式有诸多方面的优点:
关于 Lance 格式更多的介绍,请参考 Lance 文档。
对于多模态 Folder 格式,这里做一个简要的说明。
以一个 Folder 为例,它的目录结构情况如下所示:
my_data/ metadata.jsonl # 可选 sub_folder/ pic3.png pic4.png ... pic1.png pic2.png audio1.mp3 audio2.mp3 ...
当它作为一个 Image Folder 读时:
bytes:如果选择读取文件内容,并将文件读取为二进制数据。默认无此 column base64: 如果选择读取文件内容,并将文件读取为 base64 字符串。默认无此 column path: 文件的路径 size: 文件大小 rows: 如果文件为 parquet 格式,该 parquet 文件的行数
写 Folder 分为两个阶段:
说明
daft 目前尚不支持将内存数据存成图片、音频文件或者视频文件,即 daft 仅支持写 metadata。
如果用户有某一列或者某几列数据为图片、音频、视频的内容数据,且想把它们保存为文件,然后保存为 LAS 数据集,可以采用 UDF 的方式进行操作:
注意
关于写 metadata 尤其要注意 metadata 的数据集不能太大,不要超过 driver 可用内存的 80%。否则会造成 driver 节点 OOM。因为数据要先收集到 driver 节点才能保存为 TOS 的 metadata 文件。如果 dataframe 中有 Image/Audio/Video 等内容数据,写之前可以先 drop 这些 column。
LAS 的读数据集 API 签名如下:
def read_las_dataset(name: str, read_options: ReadOptions | None = None) -> DataFrame: """Create a DataFrame from a las dataset. Args: name: Name of the dataset. read_options: The format specified read options. These options will be passed to correspond read functions. Returns: DataFrame: a DataFrame with the schema converted from the dataset. For audio/image/video folder, if there's a metadata.csv/metadata.jsonl/metadata.parquet, this will return the content the metadata file as dataframe, otherwise, this will return a dataframe whose content is the file list, with the following schema: 1. bytes/base64: the content of the file, bytes or base64 according to the reading options, and if only read url, this column does not exist. 2. path: the path to the file/directory 3. size: size of the object in bytes 4. rows: number of rows if the file is parquet """
其中 ReadOptions 对应了读各种格式的配置,它因不同的数据类型而有所区别:
@dataclass class CsvReadOptions(ReadOptions): infer_schema: bool = True schema: dict[str, DataType] | None = None has_headers: bool = True delimiter: str | None = None double_quote: bool = True quote: str | None = None escape_char: str | None = None comment: str | None = None allow_variable_columns: bool = False file_path_column: str | None = None hive_partitioning: bool = False schema_hints: dict[str, DataType] | None = None _buffer_size: int | None = None _chunk_size: int | None = None
@dataclass class JsonReadOptions(ReadOptions): infer_schema: bool = True schema: dict[str, DataType] | None = None file_path_column: str | None = None hive_partitioning: bool = False schema_hints: dict[str, DataType] | None = None _buffer_size: int | None = None _chunk_size: int | None = None
@dataclass class ParquetReadOptions(ReadOptions): row_groups: list[list[int]] | None = None infer_schema: bool = True schema: dict[str, DataType] | None = None file_path_column: str | None = None hive_partitioning: bool = False coerce_int96_timestamp_unit: str | TimeUnit | None = None schema_hints: dict[str, DataType] | None = None _multithreaded_io: bool | None = None _chunk_size: int | None = None
@dataclass class LanceReadOptions(ReadOptions): version: str | int | None = None asof: str | None = None block_size: int | None = None commit_lock: object | None = None index_cache_size: int | None = None default_scan_options: dict[str, str] | None = None metadata_cache_size_bytes: int | None = None
@dataclass class FolderReadOptions(ReadOptions): metadata_options: ReadOptions = field( default_factory=lambda: ReadOptions(io_config=IOConfig(s3=TOSConfig.from_env().to_s3_config())) ) file_types: list[str] = field(default_factory=list) read_type: Literal["url", "binary", "base64", "ndarray"] = "url" @dataclass class ImageFolderReadOptions(FolderReadOptions): file_types: list[str] = field( default_factory=lambda: [".blp", ".dib", ".bmp", ".bufr", ".cur", ".pcx", ".dcx", ".dds", ".ps", ".eps", ".fit", ".fits", ".fli", ".flc", ".ftc", ".ftu", ".gbr", ".gif", ".grib", ".apng", ".jp2", ".j2k", ".jpc", ".jpf", ".jpx", ".j2c", ".icns", ".ico", ".im", ".iim", ".tif", ".tiff", ".jfif", ".jpe", ".jpg", ".jpeg", ".mpg", ".mpeg", ".msp", ".pcd", ".pxr", ".pbm", ".pgm", ".ppm", ".pnm", ".psd", ".bw", ".rgb", ".rgba", ".sgi", ".ras", ".tga", ".icb", ".vda", ".vst", ".webp", ".wmf", ".emf", ".xbm", ".xpm"] ) @dataclass class AudioFolderReadOptions(FolderReadOptions): file_types: list[str] = field( default_factory=lambda: [".aiff", ".au", ".avr", ".caf", ".flac", ".htk", ".svx", ".mat4", ".mat5", ".mpc2k", ".m4a", ".ogg", ".paf", ".pvf", ".raw", ".rf64", ".sd2", ".sds", ".ircam", ".voc", ".w64", ".wav", ".nist", ".wavex", ".wve", ".xi", ".mp3", ".opus"] ) @dataclass class VideoFolderReadOptions(FolderReadOptions): file_types: list[str] = field( default_factory=lambda: [".mkv", ".mp4", ".avi", ".mpeg", ".mov"] )
注意
CSV、JSON、Parquet、Lance 对应的 ReadOptions,实际上对应了 daft.read_csv, daft.read_json,daft.read_parquet, daft.read_lance, 几个方法的参数。其意义可以看 daft 的 API 文档。
导出必要的环境变量:
export ACCESS_KEY=xxx export SECRET_KEY=xxx export TOS_ENDPOINT=https://tos-cn-beijing.volces.com # 如果是内网,volces 改为 ivolces export TOS_REGION=cn-beijing
import daft import ray from daft.io import CsvReadOptions, IOConfig from daft.las.io import TOSConfig ray.init() # 如果在分布式 ray 环境中 io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) dataset_name = "my_dataset" read_options = CsvReadOptions(io_config=io_config) df = daft.read_las_dataset(name=dataset_name, read_options=read_options) df.show() # 输出 ╭─────────────────┬───────╮ │ file_name ┆ size │ │ --- ┆ --- │ │ Utf8 ┆ Int64 │ ╞═════════════════╪═══════╡ │ subfolder/1.png ┆ 234 │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2.png ┆ 256 │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3.jpg ┆ 300 │ ╰─────────────────┴───────╯ (Showing first 3 of 3 rows)
假设用户的目录结构如下:
image_dataset /subfolder/1.png /2.png /3.jpg /4.txt # 注意这里有一个非图片的文件 /metadata.jsonl
其中 metadata.jsonl 的内容如下。
{"file_name":"subfolder/1.png","size":1705} {"file_name":"2.png","size":1899} {"file_name":"3.jgp","size":929}
示例代码
import daft import ray from daft.io._las_dataset import FolderReadOptions, IOConfig from daft.las.io import TOSConfig ray.init() # 如果在分布式 ray 环境中 dataset_name = "my_dataset" io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) read_options = FolderReadOptions(io_config=io_config) df = daft.read_las_dataset(name=dataset_name, read_options=read_options) df.show() # 输出 ╭─────────────────┬───────╮ │ file_name ┆ size │ │ --- ┆ --- │ │ Utf8 ┆ Int64 │ ╞═════════════════╪═══════╡ │ subfolder/1.png ┆ 1705 │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2.png ┆ 1899 │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3.jpg ┆ 929 │ ╰─────────────────┴───────╯ (Showing first 3 of 3 rows)
示例代码
import daft import ray from daft.io import ImageFolderReadOptions, IOConfig from daft.las.io import TOSConfig ray.init() # 如果在分布式 ray 环境中 dataset_name = "my_dataset" io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) read_options = ImageFolderReadOptions(io_config=io_config, read_type="url") # Audio/Video 类似 df = daft.read_las_dataset(name=dataset_name, read_options=read_options) df.show() # 输出 ╭────────────────────────────────┬───────┬──────────╮ │ image ┆ size ┆ num_rows │ │ --- ┆ --- ┆ --- │ │ Utf8 ┆ Int64 ┆ Int64 │ ╞════════════════════════════════╪═══════╪══════════╡ │ s3://las-ci/daft/dataset/imag… ┆ 1899 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-ci/daft/dataset/imag… ┆ 929 ┆ None │ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ │ s3://las-ci/daft/dataset/imag… ┆ 1705 ┆ None │ ╰────────────────────────────────┴───────┴──────────╯ (Showing first 3 of 3 rows)
LAS 的写数据集 API 签名如下:
def write_las_dataset( # type: ignore[no-untyped-def] self, name: str, format: str | None = None, write_options: WriteOptions | None = None, create_ds_options: CreateLasDatasetOptions | None = None, ) -> DataFrame: """Writes the DataFrame to LAS dataset, returning a new DataFrame with paths to the files that were written. The dataset will be created if it was not exist. Files may be written to `<root_dir>/*` with randomly generated UUIDs as the file names. Args: name: Name of the dataset to be created. format: Format of the dataset (CSV, Parquet, Lance, Jsonl or Iceberg). write_options: The format specified write options. These options will be passed to correspond writing functions. create_ds_options: The options used to create the dataset. Returns: DataFrame: A new DataFrame containing paths to the written files. Raises: ValueError: If the format is unsupported or if required arguments are missing. Exception: If dataset creation fails in the LAS service. """
其中,WriteOptions 对应了写各种格式的配置,它因不同的数据类型而有所区别。
@dataclass class CsvWriteOptions(WriteOptions): root_dir: str | pathlib.Path | None = None write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append" partition_cols: list[ColumnInputType] | None = None
@dataclass class JsonWriteOptions(WriteOptions): root_dir: str | pathlib.Path | None = None write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append" partition_cols: list[ColumnInputType] | None = None
@dataclass class ParquetWriteOptions(WriteOptions): root_dir: str | pathlib.Path | None = None compression: str = "snappy" write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append" partition_cols: list[ColumnInputType] | None = None
@dataclass class LanceWriteOptions(WriteOptions): uri: str | pathlib.Path | None = None mode: Literal["create", "append", "overwrite"] = "create" schema: Schema | None = None
@dataclass class FolderWriteOptions(WriteOptions): root_dir: str | None = None write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append" metadata_format: Literal["csv", "jsonl", "parquet"] = "jsonl"
注意
WriteOptions,实际上对应了 dataframe.write_csv, dataframe.write_json,dataframe.write_parquet, dataframe.write_lance, 几个方法的参数。其意义可以看 daft 的 API 文档。metadata_options 对应了 metadata 文件(可以是 CSV、JSONL、Parquet 格式)的 WriteOptions。目前,几种格式的写模式与默认写模式支持情况如下:
格式 | 模式 | 默认模式 | 说明 |
|---|---|---|---|
CSV | ["append", "overwrite", "overwrite-partitions"] | "append" | 目前仅支持 "append" |
JSONL | ["append", "overwrite", "overwrite-partitions"] | "append" | 目前仅支持 "append" |
Parquet | ["append", "overwrite", "overwrite-partitions"] | "append" | 目前仅支持 "append" |
Lance | ["create", "append", "overwrite"] | "create" | 注意首次写使用 "create",后续写使用 "append" 或者 "overwrite" |
Folder(Image/Audio/Video) | ["append", "overwrite", "overwrite-partitions"] | "append" |
|
同上,先导出必要的环境变量:
export ACCESS_KEY=xxx export SECRET_KEY=xxx export TOS_ENDPOINT=https://tos-cn-beijing.volces.com # 如果是内网,volces 改为 ivolces export TOS_REGION=cn-beijing
import daft import ray from daft.io import CreateLasDatasetOptions, CsvWriteOptions, IOConfig from daft.las.io import TOSConfig ray.init() # 如果在分布式 ray 环境中 dataset_name = "my_dataset" format = "csv" root_dir = "tos://bucket/path/to/my/dataset" io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) write_options = CsvWriteOptions(io_config=io_config, root_dir=root_dir) create_ds_options = CreateLasDatasetOptions( nick_name="daft_test_csv_write", privacy="public", description="This is my dataset" ) df = daft.from_pydict({ "file_name": ["subfolder/1.png", "2.png", "3.jpg"], "size": [1705, 1899, 929] }) df.write_las_dataset(name=dataset_name, format=format, write_options=write_options, create_ds_options=create_ds_options)
写完之后,可以到 LAS 平台查看创建的数据集。亦可以按照读数据集的方式使用 ve-daft 读取数据集查看。
默认情况下,CSV/JSONL/Parquet 以及 Image/Audio/Video 类型的 metadata,格式的默认行为即为 "append",它的实际意义是 "create" and "append"。对于 Lance,需要显式地设置 "append":
write_options = LanceWriteReadOptions(io_config=io_config, uri=root_dir, mode="append")
目前,除了 Lance,其他格式均不支持覆盖写。这种情况下,用户需要先删除数据集或者直接删除 TOS 数据,再去写新的数据。
import daft import ray from daft.io import CreateLasDatasetOptions, ImageFolderWriteOptions, IOConfig from daft.las.io import TOSConfig ray.init() # 如果在分布式 ray 环境中 dataset_name = "my_dataset" format = "image" root_dir = "tos://bucket/path/to/my/dataset" io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) write_options = ImageFolderWriteOptions(io_config=io_config, root_dir=root_dir) create_ds_options = CreateLasDatasetOptions( nick_name="my_dataset", privacy="private", description="This is my dataset" ) df = daft.from_pydict({ "file_name": ["subfolder/1.png", "2.png", "3.jpg"], "size": [1705, 1899, 929] }) df.write_las_dataset(name=dataset_name, format=format, write_options=write_options, create_ds_options=create_ds_options)
写完之后,可以利用上文介绍的读方法将数据集读取出来。