You need to enable JavaScript to run this app.
导航
快速上手
最近更新时间:2025.09.22 10:47:07首次发布时间:2025.08.29 15:46:07
复制全文
我的收藏
有用
有用
无用
无用

安装

LAS 开发机默认已经安装了 ve-daft,用户无需单独安装。如果用户需要在自己的机器上安装 ve-daft,操作如下。

  1. ve-daft 尚未在公网 pypi 仓库发布,用户需要下载 ve-daft 的离线安装包:点击下载
  2. 执行如下命令安装 ve-daft。
pip install ve_daft-0.5.10.post7-cp39-abi3-manylinux_2_35_x86_64.whl

默认情况下,ve-daft 只提供对 LAS 数据集的读写,如果需要使用 LAS 提供的多模态计算算子,则需要安装如下两个 extra:

pip install ve_daft-0.5.10.post7-cp39-abi3-manylinux_2_35_x86_64.whl[las]
pip install ve_daft-0.5.10.post7-cp39-abi3-manylinux_2_35_x86_64.whl[flash-attn]

注意

由于计算算子使用了多种模型/算法库,体积会比较大,请用户预留好空间。

基础使用

下文介绍 daft dataframe 的基本操作。

创建 DataFrame

我们可以将一个 python dict 实例化成为一个 dataframe 并展示:

import daft

df = daft.from_pydict({
    "A": [1, 2, 3, 4],
    "B": [1.5, 2.5, 3.5, 4.5],
    "C": [True, True, False, False],
    "D": [None, None, None, None],
}).show()

╭───────┬─────────┬─────────┬──────╮
│ A     ┆ B       ┆ C       ┆ D    │
│ ---   ┆ ---     ┆ ---     ┆ ---  │
│ Int64 ┆ Float64 ┆ Boolean ┆ Null │
╞═══════╪═════════╪═════════╪══════╡
│ 1     ┆ 1.5     ┆ true    ┆ None │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2     ┆ 2.5     ┆ true    ┆ None │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 3     ┆ 3.5     ┆ false   ┆ None │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 4     ┆ 4.5     ┆ false   ┆ None │
╰───────┴─────────┴─────────┴──────╯

(Showing first 4 of 4 rows)

同样的,我们也可以从 python list 创建 dataframe

df = daft.from_pylist([
    {"A": 1, "B": 1.5, "C", True, "D": None},
    {"A": 2, "B": 2.5, "C", True, "D": None},
    {"A": 3, "B": 3.5, "C", False, "D": None},
    {"A": 4, "B": 4.5, "C", False, "D": None},
])

我们还可以从以下数据结构创建 DataFrame:

from_pandas(data: Union["pd.DataFrame", list["pd.DataFrame"]])
from_arrow(data: Union["pa.Table", list["pa.Table"], Iterable["pa.Table"]])
from_ray_dataset(ds: "RayDataset")
from_dask_dataframe(ddf: "dask.DataFrame")

除此之外,我们还可以从数据源读取数据为 dataframe。详情请见第二节“数据源支持”。

对 DataFrame 进行操作

使用 ve-daft 的 dataframe 跟使用其他引擎,比如 Pandas、PySpark 等 dataframe 类似,常见的操作基本是相同的,很容易就会上手,比如:
选择列,

df.select("A", "B").show()

╭───────┬─────────╮
│ A     ┆ B       │
│ ---   ┆ ---     │
│ Int64 ┆ Float64 │
╞═══════╪═════════╡
│ 3     ┆ 3.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 4     ┆ 4.5     │
╰───────┴─────────╯

(Showing first 2 of 2 rows)

或者是过滤

df.filter(df["A"] > 2).show()

╭───────┬─────────┬─────────┬──────┬─────────╮
│ A     ┆ B       ┆ C       ┆ D    ┆ E       │
│ ---   ┆ ---     ┆ ---     ┆ ---  ┆ ---     │
│ Int64 ┆ Float64 ┆ Boolean ┆ Null ┆ Float64 │
╞═══════╪═════════╪═════════╪══════╪═════════╡
│ 3     ┆ 3.5     ┆ false   ┆ None ┆ 6.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 4     ┆ 4.5     ┆ false   ┆ None ┆ 8.5     │
╰───────┴─────────┴─────────┴──────┴─────────╯

(Showing first 2 of 2 rows)

或者是使用普通的 Expression 执行计算:

df = df.with_column("E", df["A"] + df["B"]).show()

╭───────┬─────────┬─────────┬──────┬─────────╮
│ A     ┆ B       ┆ C       ┆ D    ┆ E       │
│ ---   ┆ ---     ┆ ---     ┆ ---  ┆ ---     │
│ Int64 ┆ Float64 ┆ Boolean ┆ Null ┆ Float64 │
╞═══════╪═════════╪═════════╪══════╪═════════╡
│ 1     ┆ 1.5     ┆ true    ┆ None ┆ 2.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ 2.5     ┆ true    ┆ None ┆ 4.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 3     ┆ 3.5     ┆ false   ┆ None ┆ 6.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 4     ┆ 4.5     ┆ false   ┆ None ┆ 8.5     │
╰───────┴─────────┴─────────┴──────┴─────────╯

(Showing first 4 of 4 rows)

等等。
用户还可以使用 UDF 对 dataframe 进行操作。daft 的 UDF 与 Pandas、PySpark 等引擎的 UDF 有独到之处,它可以支持 class。这种 UDF 对于 AI 计算有非常重要的作用,详情可以参考下文第三节“用户自定义函数”。

关于更多的操作,可以参考 daft 官方文档。

多模态支持

daft 的一大特点就是针对多模态数据的支持。这种多模态支持体现在两个方面:

  1. 多模态的数据类型,这种数据类型拓展了传统的 DataFrame/SQL 的数据类型,新增加了对于 Image、URL、Text、Json 等类型的支持;
  2. 增加了对这些类型的直接操作(或称为“算子”),例如,对于 URL 类型,可以使用 download() 方法对文件进行下载,对于 Image 类型,可以使用 decode() 方法对图片进行解码等。下面展示一个例子:
df = daft.read_parquet("metadata.parquet") \
    .with_column("image_data", col("urls").url.download()) \
    .with_column("resized", col("image_data").image.resize(224, 224))

数据源支持

数据源列表

ve-daft 支持多种数据源,除了常见的 parquet、csv、json,还支持 lance 这种新兴数据湖格式,以及 LAS 的数据集读写。完整的列表如下:

  • csv
  • json
  • parquet
  • iceberg
  • hudi
  • lance
  • deltalake
  • clickhouse
  • mcap
  • las_dataset

其中 clickhouse、mcap、las_dataset 为 ve-daft 专有支持。

读写 tos

目前 ve-daft 尚未适配 tos 协议,因此用户在读写位于 tos 上的数据时仍然需要使用 s3 兼容协议。下面展示了一下如何读取位于 tos 上的 csv 数据,以及如何向 tos 写 csv 数据。

  1. 首先配置环境变量
export ACCESS_KEY=xxx
export SECRET_KEY=xxx
export TOS_ENDPOINT=https://tos-cn-beijing.volces.com
export TOS_REGION=cn-beijing
  1. 读 csv:
import daft
from daft.daft import IOConfig
from daft.las.io.tos import TOSConfig

s3_config = TOSConfig.from_env().to_s3_config()
df = daft.read_csv(path="s3://path/to/csv/dir", io_config=IOConfig(s3=s3_config))
  1. 写 csv:
import daft
from daft.daft import IOConfig
from daft.las.io.tos import TOSConfig


df = daft.from_pydict({
    "A": [1, 2, 3, 4],
    "B": [1.5, 2.5, 3.5, 4.5],
    "C": [True, True, False, False],
    "D": [None, None, None, None],
})

s3_config = TOSConfig.from_env().to_s3_config()
df = daft.write_csv(root_dir="s3://path/to/csv/dir", io_config=IOConfig(s3=s3_config))

其他格式的读写可以参考 daft 的官方文档

高阶使用

读写 LAS 数据集

ve-daft 的一大特点就是支持对 LAS 数据集的读写。关于对 LAS 数据集的读写可以参考文档 读写 LAS 数据集

用户自定义函数(UDF)

ve-daft 支持 两种类型的 UDF:function 类型以及 class 类型,其中 class 类型是 ve-daft 区别于其他引擎(如 spark、hive)的一个重要特点。function 类型 UDF 进一步可以划分为 lambda function UDF 和普通 function UDF。

  1. lambda 类 UDF。

这类 UDF 一般用于行内计算,适用于简单的场景,它通过 .apply() 发起调用:

df.with_column(
    "flattened_image",
    df["image"].apply(lambda img: img.flatten(), return_dtype=daft.DataType.python())
).show(2)
  1. 普通 function 类 UDF。

这类 UDF 就是我们在其他引擎常见的 UDF,它通过 @daft.udf() 注解进行定义:

@daft.udf(return_dtype=daft.DataType.python())
def crop_images(images, crops, padding=0):
    cropped = []
    for img, crop in zip(images, crops):
        x1, x2, y1, y2 = crop
        cropped_img = img[x1:x2 + padding, y1:y2 + padding]
        cropped.append(cropped_img)
    return cropped

df = df.with_column(
    "cropped",
    crop_images(df["image"], df["crop"], padding=1),
)
df.show(2)
  1. class 类 UDF。

它也是通过 @daft.udf() 定义,区别与 function 类,它是定义在 class 上的:

@daft.udf(
    return_dtype=daft.DataType.int64(),
    concurrency=4,  # initialize 4 instances of the UDF to run across your data
)
class RunModel:
    def __init__(self, model_name = "meta-llama/Llama-4-Scout-17B-16E-Instruct"):
        # Perform expensive initializations
        self._model = create_model(model_name)
    def __call__(self, features_col):
        return self._model(features_col)
        
df = df.with_column("image_classifications", RunModel(df["image"]))

Class UDF 的优点在于它可以维护“状态”,可以重复利用,因此也可以做一些比较重的操作,比如加载模型。如果使用 function 类 UDF,则模型加载需要在每次调用 UDF 时进行,其时间消耗是无法想象的。

ve-daft 算子

内置算子

“算子”是 ve-daft 提供的,用于多模态数据处理的函数,比如文本特殊字符移除、敏感信息过滤、视频抽取音频、说话人识别等等,覆盖范围广阔。算子的本质是具备特定接口的 class,可以通过 UDF 的方式调用。
如下代码是一个真实可运行的例子:

import os

import daft
from daft import col
from daft.las.functions.audio import AudioDuration
from daft.las.functions.udf import las_udf

if __name__ == "__main__":
    sample_path = f"tos://tos_bucket/audio_duration/sample.mp3"

    df = daft.from_pydict({"audio_path": [sample_path]})
    df = df.with_column("duration_result", las_udf(AudioDuration)(col("audio_path")))
    df.show()
    # ╭────────────────────────────────┬─────────────────╮
    # │ audio_path                     ┆ duration_result │
    # │ ---                            ┆ ---             │
    # │ Utf8                           ┆ Float32         │
    # ╞════════════════════════════════╪═════════════════╡
    # │ tos://tos_bucket/audio_durati… ┆ 49.711          │
    # ╰────────────────────────────────┴─────────────────╯

在上面的代码中,AudioDuration 是用于计算音频时长的函数(callable class),las_udf 则是一个封装方法,将 AudioDuration 封装为一个 ve-daft 的 UDF。
las_udf() 封装了多个参数:

las_udf(
    operator: type[Operator],
    construct_args: dict[str, Any] | None = None,
    num_cpus: float | None = None,
    num_gpus: float | None = None,
    memory_bytes: int | None = None,
    batch_size: int | None = None,
    concurrency: int | None = None,
)

参数说明

  • operator: 指算子类。
  • construct_args: 算子的初始化参数,对应类的 __init__ 命名参数。
  • num_cpus: ray 模式下单个算子使用的 cpu 个数。
  • num_gpus: ray 模式下单个算子使用的 gpu 个数。
  • memory_bytes: ray 模式下单个算子使用的 memory 大小。
  • batch_size: 每个 batch 处理数据量的条数。
  • concurrency: 算子并发数量。

自定义算子

用户可以自定义算子,只需要继承如下类并实现相应的接口即可:

class Operator(ABC):
    """An operator that process the input data."""

    def transform(self, *args: Any, **kwargs: Any) -> pa.Array:
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def __return_column_type__() -> pa.DataType: ...

说明

  • 如果输入的是 dataframe 的列,它的类型应为 py.Array,比如transform(self, col: pa.Array) -> pa.Array
  • __return_column_type__ 定义的返回类型是 pa.Array 内部内容的返回类型,比如,如果 pa.Array 内存放的为 float 数据,那么返回类型应为 pa.float64()

下面是一个简单的例子,返回值本身:

import daft
import pyarrow as pa

from daft import col
from daft.las.functions.types import Operator
from daft.las.functions.udf import las_udf

# 1. 定义 operator
class EchoUDF(Operator):
    @staticmethod
    def __return_column_type__() -> pa.DataType:
        return pa.int64()

    def transform(self, col_a: pa.Array, col_b: pa.Array | None = None, c: str | None = None) -> pa.Array:
        return col_a

# 2. 使用 oerator
df = daft.from_pydict({
    "A": [1, 2, 3, 4],
    "B": [1.5, 2.5, 3.5, 4.5]
})

df = df.with_column("dup_a", las_udf(EchoUDF)(col("A")))

df.show()

输出:

╭───────┬─────────┬───────╮
│ A     ┆ B       ┆ dup_a │
│ ---   ┆ ---     ┆ ---   │
│ Int64 ┆ Float64 ┆ Int64 │
╞═══════╪═════════╪═══════╡
│ 1     ┆ 1.5     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2.5     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3.5     ┆ 3     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 4     ┆ 4.5     ┆ 4     │
╰───────┴─────────┴───────╯

(Showing first 4 of 4 rows)