You need to enable JavaScript to run this app.
导航
读写 LAS 数据集
最近更新时间:2025.09.22 10:49:24首次发布时间:2025.08.29 15:28:40
复制全文
我的收藏
有用
有用
无用
无用

简介

LAS平台具备数据集托管功能。用户可将自有数据托管于该平台,并借助平台提供的工具实现数据的预览与分析操作。
LAS的数据集托管采用半托管模式。在此模式下,用户无需将数据上传至LAS平台,仅需将数据留存于自身的TOS/vePFS系统中进行维护即可。这一模式有效保障了数据的安全性,消除了用户对数据泄露风险的担忧。
此外,LAS托管的数据集可直接利用平台提供的工具开展大规模计算处理,满足用户在数据运算方面的需求。
使用 LAS 平台提供的 ve-daft、开发机、工作流等工具,用户可以搭建一个完整的数据处理链路。关于这方面可以参考LAS作业开发。本文重点介绍如何使用 ve-daft 读写 LAS 的数据集。

数据类型

目前 LAS 数据集支持以下几种类型:

  • 表类型
    • CSV
    • JSON
    • Lance
    • Parquet
  • 多模态 Folder 类型
    • Image Folder
    • Audio Folder
    • Video Folder

说明

  • JSON 的读写目前仅支持 daft 的 native 引擎,尚不支持使用 ray 进行分布式处理。
  • Iceberg、Text 尚在开发中。

表类型

对于表类型,LAS 推荐使用 Lance 作为数据存储格式。该格式有诸多方面的优点:

  1. 存储效率比一般的文本格式要高。
  2. 对于 AI 典型的场景如加列等操作效率更高。
  3. 支持多模数据的存储。
  4. 支持直接基于 Lance 格式进行高效的训练。

关于 Lance 格式更多的介绍,请参考 Lance 文档

Folder 类型

对于多模态 Folder 格式,这里做一个简要的说明。

读 Folder

以一个 Folder 为例,它的目录结构情况如下所示:

my_data/
    metadata.jsonl # 可选
    sub_folder/
        pic3.png
        pic4.png
        ...
    pic1.png
    pic2.png
    audio1.mp3
    audio2.mp3
    ...

当它作为一个 Image Folder 读时:

  1. 如果有 metadata.csv, metadata.jsonl 或者 metadata.parquet 时,会将此 metadata 读取为 image dataframe。也就是说,此 metadata 描述了整个文件夹内的文件。一般情况下,metadata.jsonl 的每一行描述一个文件,记录该文件的特征,如文件链接、大小、标签等等。
  2. 如果没有 metadata,那么
    1. 会将文件列表读取为 dataframe,文件列表会递归处理,并且按照 Image 支持的扩展类型进行过滤,比如 .txt 格式的文件将不会作为 Image dataframe 的元素。如果将其作为 Audio 格式来读,那么 audio1.mp3 和 audio2.mp3 会被读取,其他的不会被读取。
    2. 读取文件列表的数据返回固定的 schema:
bytes:如果选择读取文件内容,并将文件读取为二进制数据。默认无此 column
base64: 如果选择读取文件内容,并将文件读取为 base64 字符串。默认无此 column
path: 文件的路径
size: 文件大小
rows: 如果文件为 parquet 格式,该 parquet 文件的行数

写 Folder

写 Folder 分为两个阶段:

  1. 写 Image/Audio/Video 原始数据
  2. 写 metadata(可选)

说明

daft 目前尚不支持将内存数据存成图片、音频文件或者视频文件,即 daft 仅支持写 metadata。

如果用户有某一列或者某几列数据为图片、音频、视频的内容数据,且想把它们保存为文件,然后保存为 LAS 数据集,可以采用 UDF 的方式进行操作:

  1. 定义一个 UDF,其内容为将内存数据保存为本地文件,然后上传至 TOS。
  2. 将该列删除。
  3. 其余的列保存为 Image/Audio/Video 类型的 LAS dataset。

注意

关于写 metadata 尤其要注意 metadata 的数据集不能太大,不要超过 driver 可用内存的 80%。否则会造成 driver 节点 OOM。因为数据要先收集到 driver 节点才能保存为 TOS 的 metadata 文件。如果 dataframe 中有 Image/Audio/Video 等内容数据,写之前可以先 drop 这些 column。

读写 LAS 数据集

读操作

方法签名

LAS 的读数据集 API 签名如下:

def read_las_dataset(name: str, read_options: ReadOptions | None = None) -> DataFrame:
    """Create a DataFrame from a las dataset.

    Args:
        name: Name of the dataset.
        read_options: The format specified read options. These options will be passed to correspond read functions.

    Returns:
        DataFrame: a DataFrame with the schema converted from the dataset.
            For audio/image/video folder, if there's a metadata.csv/metadata.jsonl/metadata.parquet,
            this will return the content the metadata file as dataframe, otherwise, this will return
            a dataframe whose content is the file list, with the following schema:

            1. bytes/base64: the content of the file, bytes or base64 according to the reading options,
                and if only read url, this column does not exist.
            2. path: the path to the file/directory
            3. size: size of the object in bytes
            4. rows: number of rows if the file is parquet
    """

其中 ReadOptions 对应了读各种格式的配置,它因不同的数据类型而有所区别:

  • CSV
@dataclass
class CsvReadOptions(ReadOptions):
    infer_schema: bool = True
    schema: dict[str, DataType] | None = None
    has_headers: bool = True
    delimiter: str | None = None
    double_quote: bool = True
    quote: str | None = None
    escape_char: str | None = None
    comment: str | None = None
    allow_variable_columns: bool = False
    file_path_column: str | None = None
    hive_partitioning: bool = False
    schema_hints: dict[str, DataType] | None = None
    _buffer_size: int | None = None
    _chunk_size: int | None = None
  • JSON
@dataclass
class JsonReadOptions(ReadOptions):
    infer_schema: bool = True
    schema: dict[str, DataType] | None = None
    file_path_column: str | None = None
    hive_partitioning: bool = False
    schema_hints: dict[str, DataType] | None = None
    _buffer_size: int | None = None
    _chunk_size: int | None = None
  • Parquet
@dataclass
class ParquetReadOptions(ReadOptions):
    row_groups: list[list[int]] | None = None
    infer_schema: bool = True
    schema: dict[str, DataType] | None = None
    file_path_column: str | None = None
    hive_partitioning: bool = False
    coerce_int96_timestamp_unit: str | TimeUnit | None = None
    schema_hints: dict[str, DataType] | None = None
    _multithreaded_io: bool | None = None
    _chunk_size: int | None = None
  • Lance
@dataclass
class LanceReadOptions(ReadOptions):
    version: str | int | None = None
    asof: str | None = None
    block_size: int | None = None
    commit_lock: object | None = None
    index_cache_size: int | None = None
    default_scan_options: dict[str, str] | None = None
    metadata_cache_size_bytes: int | None = None
  • Image/Audio/Video Folder
@dataclass
class FolderReadOptions(ReadOptions):
    metadata_options: ReadOptions = field(
        default_factory=lambda: ReadOptions(io_config=IOConfig(s3=TOSConfig.from_env().to_s3_config()))
    )
    file_types: list[str] = field(default_factory=list)
    read_type: Literal["url", "binary", "base64", "ndarray"] = "url"

@dataclass
class ImageFolderReadOptions(FolderReadOptions):
    file_types: list[str] = field(
        default_factory=lambda: [".blp", ".dib", ".bmp", ".bufr", ".cur", ".pcx", ".dcx", ".dds", ".ps", ".eps", ".fit", ".fits", ".fli", ".flc", ".ftc", ".ftu", ".gbr", ".gif", ".grib", ".apng", ".jp2", ".j2k", ".jpc", ".jpf", ".jpx", ".j2c", ".icns", ".ico", ".im", ".iim", ".tif", ".tiff", ".jfif", ".jpe", ".jpg", ".jpeg", ".mpg", ".mpeg", ".msp", ".pcd", ".pxr", ".pbm", ".pgm", ".ppm", ".pnm", ".psd", ".bw", ".rgb", ".rgba", ".sgi", ".ras", ".tga", ".icb", ".vda", ".vst", ".webp", ".wmf", ".emf", ".xbm", ".xpm"]
    )

@dataclass
class AudioFolderReadOptions(FolderReadOptions):
    file_types: list[str] = field(
        default_factory=lambda: [".aiff", ".au", ".avr", ".caf", ".flac", ".htk", ".svx", ".mat4", ".mat5", ".mpc2k", ".m4a", ".ogg", ".paf", ".pvf", ".raw", ".rf64", ".sd2", ".sds", ".ircam", ".voc", ".w64", ".wav", ".nist", ".wavex", ".wve", ".xi", ".mp3", ".opus"]
    )
    
@dataclass
class VideoFolderReadOptions(FolderReadOptions):
    file_types: list[str] = field(
        default_factory=lambda: [".mkv", ".mp4", ".avi", ".mpeg", ".mov"]
    )

注意

CSV、JSON、Parquet、Lance 对应的 ReadOptions,实际上对应了 daft.read_csv, daft.read_json,daft.read_parquet, daft.read_lance, 几个方法的参数。其意义可以看 daft 的 API 文档。

读数据示例

导出必要的环境变量:

export ACCESS_KEY=xxx
export SECRET_KEY=xxx
export TOS_ENDPOINT=https://tos-cn-beijing.volces.com  # 如果是内网,volces 改为 ivolces
export TOS_REGION=cn-beijing
  1. 读取 CSV 格式的 LAS 数据集
import daft
import ray
from daft.io import CsvReadOptions, IOConfig
from daft.las.io import TOSConfig

ray.init()  # 如果在分布式 ray 环境中

io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
dataset_name = "my_dataset"
read_options = CsvReadOptions(io_config=io_config)

df = daft.read_las_dataset(name=dataset_name, read_options=read_options)
df.show()

# 输出
╭─────────────────┬───────╮                                                                                                                                                                                                                                               
│ file_name       ┆ size  │
│ ---             ┆ ---   │
│ Utf8            ┆ Int64 │
╞═════════════════╪═══════╡
│ subfolder/1.png ┆ 234   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2.png           ┆ 256   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3.jpg           ┆ 300   │
╰─────────────────┴───────╯

(Showing first 3 of 3 rows)
  1. 读取 Image Folder 类型的数据集(Audio/Video 类似)。
  • 有 metadata 示例

假设用户的目录结构如下:

image_dataset
    /subfolder/1.png
    /2.png
    /3.jpg
    /4.txt   # 注意这里有一个非图片的文件
    /metadata.jsonl

其中 metadata.jsonl 的内容如下。

{"file_name":"subfolder/1.png","size":1705}
{"file_name":"2.png","size":1899}
{"file_name":"3.jgp","size":929}

示例代码

import daft
import ray
from daft.io._las_dataset import FolderReadOptions, IOConfig
from daft.las.io import TOSConfig

ray.init()  # 如果在分布式 ray 环境中

dataset_name = "my_dataset"

io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
read_options = FolderReadOptions(io_config=io_config)

df = daft.read_las_dataset(name=dataset_name, read_options=read_options)
df.show()

# 输出
╭─────────────────┬───────╮                                                                                                                                                                                                                                               
│ file_name       ┆ size  │
│ ---             ┆ ---   │
│ Utf8            ┆ Int64 │
╞═════════════════╪═══════╡
│ subfolder/1.png ┆ 1705  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2.png           ┆ 1899  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3.jpg           ┆ 929   │
╰─────────────────┴───────╯

(Showing first 3 of 3 rows)
  • 无 metadata

示例代码

import daft
import ray
from daft.io import ImageFolderReadOptions, IOConfig
from daft.las.io import TOSConfig

ray.init()  # 如果在分布式 ray 环境中

dataset_name = "my_dataset"

io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
read_options = ImageFolderReadOptions(io_config=io_config, read_type="url") # Audio/Video 类似

df = daft.read_las_dataset(name=dataset_name, read_options=read_options)
df.show()

# 输出
╭────────────────────────────────┬───────┬──────────╮                                                                                                                                                                                                                     
│ image                          ┆ size  ┆ num_rows │                                                                                                                                                                                                                     
│ ---                            ┆ ---   ┆ ---      │
│ Utf8                           ┆ Int64 ┆ Int64    │
╞════════════════════════════════╪═══════╪══════════╡
│ s3://las-ci/daft/dataset/imag… ┆ 1899  ┆ None     │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ s3://las-ci/daft/dataset/imag… ┆ 929   ┆ None     │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
│ s3://las-ci/daft/dataset/imag… ┆ 1705  ┆ None     │
╰────────────────────────────────┴───────┴──────────╯

(Showing first 3 of 3 rows)

写操作

方法签名

LAS 的写数据集 API 签名如下:

def write_las_dataset(  # type: ignore[no-untyped-def]
    self,
    name: str,
    format: str | None = None,
    write_options: WriteOptions | None = None,
    create_ds_options: CreateLasDatasetOptions | None = None,
) -> DataFrame:
    """Writes the DataFrame to LAS dataset, returning a new DataFrame with paths to the files that were written.

    The dataset will be created if it was not exist.
    Files may be written to `<root_dir>/*` with randomly generated UUIDs as the file names.

    Args:
        name: Name of the dataset to be created.
        format: Format of the dataset (CSV, Parquet, Lance, Jsonl or Iceberg).
        write_options: The format specified write options. These options will be passed to correspond writing functions.
        create_ds_options: The options used to create the dataset.

    Returns:
        DataFrame: A new DataFrame containing paths to the written files.

    Raises:
        ValueError: If the format is unsupported or if required arguments are missing.
        Exception: If dataset creation fails in the LAS service.
    """

其中,WriteOptions 对应了写各种格式的配置,它因不同的数据类型而有所区别。

  • CSV
@dataclass
class CsvWriteOptions(WriteOptions):
    root_dir: str | pathlib.Path | None = None
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append"
    partition_cols: list[ColumnInputType] | None = None
  • JSON
@dataclass
class JsonWriteOptions(WriteOptions):
    root_dir: str | pathlib.Path | None = None
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append"
    partition_cols: list[ColumnInputType] | None = None
  • Parquet
@dataclass
class ParquetWriteOptions(WriteOptions):
    root_dir: str | pathlib.Path | None = None
    compression: str = "snappy"
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append"
    partition_cols: list[ColumnInputType] | None = None
  • Lance
@dataclass
class LanceWriteOptions(WriteOptions):
    uri: str | pathlib.Path | None = None
    mode: Literal["create", "append", "overwrite"] = "create"
    schema: Schema | None = None
  • Image/Audio/Video
@dataclass
class FolderWriteOptions(WriteOptions):
    root_dir: str | None = None
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append"
    metadata_format: Literal["csv", "jsonl", "parquet"] = "jsonl"

注意

  • CSV、JSON、Parquet、Lance 对应的 WriteOptions,实际上对应了 dataframe.write_csv, dataframe.write_json,dataframe.write_parquet, dataframe.write_lance, 几个方法的参数。其意义可以看 daft 的 API 文档。
  • 关于 Image/Audio/Video Folder,它可能会存在 metadata 文件。metadata_options 对应了 metadata 文件(可以是 CSV、JSONL、Parquet 格式)的 WriteOptions

写模式

目前,几种格式的写模式与默认写模式支持情况如下:

格式

模式

默认模式

说明

CSV

["append", "overwrite", "overwrite-partitions"]

"append"

目前仅支持 "append"

JSONL

["append", "overwrite", "overwrite-partitions"]

"append"

目前仅支持 "append"

Parquet

["append", "overwrite", "overwrite-partitions"]

"append"

目前仅支持 "append"

Lance

["create", "append", "overwrite"]

"create"

注意首次写使用 "create",后续写使用 "append" 或者 "overwrite"

Folder(Image/Audio/Video)

["append", "overwrite", "overwrite-partitions"]

"append"

  • 这里指写 metadata
  • 目前仅支持 "append"

写数据集示例

同上,先导出必要的环境变量:

export ACCESS_KEY=xxx
export SECRET_KEY=xxx
export TOS_ENDPOINT=https://tos-cn-beijing.volces.com  # 如果是内网,volces 改为 ivolces
export TOS_REGION=cn-beijing
  1. 写 CSV 格式数据并创建新的 LAS 数据集
import daft
import ray
from daft.io import CreateLasDatasetOptions, CsvWriteOptions, IOConfig
from daft.las.io import TOSConfig

ray.init()  # 如果在分布式 ray 环境中

dataset_name = "my_dataset"
format = "csv"
root_dir = "tos://bucket/path/to/my/dataset"

io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
write_options = CsvWriteOptions(io_config=io_config, root_dir=root_dir)
create_ds_options = CreateLasDatasetOptions(
    nick_name="daft_test_csv_write",
    privacy="public",
    description="This is my dataset"
)

df = daft.from_pydict({
    "file_name": ["subfolder/1.png", "2.png", "3.jpg"],
    "size": [1705, 1899, 929]
})

df.write_las_dataset(name=dataset_name, format=format, write_options=write_options, create_ds_options=create_ds_options)

写完之后,可以到 LAS 平台查看创建的数据集。亦可以按照读数据集的方式使用 ve-daft 读取数据集查看。

  1. 追加数据

默认情况下,CSV/JSONL/Parquet 以及 Image/Audio/Video 类型的 metadata,格式的默认行为即为 "append",它的实际意义是 "create" and "append"。对于 Lance,需要显式地设置 "append":

write_options = LanceWriteReadOptions(io_config=io_config, uri=root_dir, mode="append")
  1. 覆盖写

目前,除了 Lance,其他格式均不支持覆盖写。这种情况下,用户需要先删除数据集或者直接删除 TOS 数据,再去写新的数据。

  1. 写 Image Folder 类型的数据集
import daft
import ray
from daft.io import CreateLasDatasetOptions, ImageFolderWriteOptions, IOConfig
from daft.las.io import TOSConfig

ray.init()  # 如果在分布式 ray 环境中

dataset_name = "my_dataset"
format = "image"
root_dir = "tos://bucket/path/to/my/dataset"

io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
write_options = ImageFolderWriteOptions(io_config=io_config, root_dir=root_dir)
create_ds_options = CreateLasDatasetOptions(
    nick_name="my_dataset",
    privacy="private",
    description="This is my dataset"
)

df = daft.from_pydict({
    "file_name": ["subfolder/1.png", "2.png", "3.jpg"],
    "size": [1705, 1899, 929]
})

df.write_las_dataset(name=dataset_name, format=format, write_options=write_options, create_ds_options=create_ds_options)

写完之后,可以利用上文介绍的读方法将数据集读取出来。