LAS 开发机默认已经安装了 ve-daft,用户无需单独安装。如果用户需要在自己的机器上安装 ve-daft,操作如下。
pip install ve_daft-0.5.10.post7-cp39-abi3-manylinux_2_35_x86_64.whl
默认情况下,ve-daft 只提供对 LAS 数据集的读写,如果需要使用 LAS 提供的多模态计算算子,则需要安装如下两个 extra:
pip install ve_daft-0.5.10.post7-cp39-abi3-manylinux_2_35_x86_64.whl[las] pip install ve_daft-0.5.10.post7-cp39-abi3-manylinux_2_35_x86_64.whl[flash-attn]
注意
由于计算算子使用了多种模型/算法库,体积会比较大,请用户预留好空间。
下文介绍 daft dataframe 的基本操作。
我们可以将一个 python dict 实例化成为一个 dataframe 并展示:
import daft df = daft.from_pydict({ "A": [1, 2, 3, 4], "B": [1.5, 2.5, 3.5, 4.5], "C": [True, True, False, False], "D": [None, None, None, None], }).show() ╭───────┬─────────┬─────────┬──────╮ │ A ┆ B ┆ C ┆ D │ │ --- ┆ --- ┆ --- ┆ --- │ │ Int64 ┆ Float64 ┆ Boolean ┆ Null │ ╞═══════╪═════════╪═════════╪══════╡ │ 1 ┆ 1.5 ┆ true ┆ None │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤ │ 2 ┆ 2.5 ┆ true ┆ None │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤ │ 3 ┆ 3.5 ┆ false ┆ None │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤ │ 4 ┆ 4.5 ┆ false ┆ None │ ╰───────┴─────────┴─────────┴──────╯ (Showing first 4 of 4 rows)
同样的,我们也可以从 python list 创建 dataframe
df = daft.from_pylist([ {"A": 1, "B": 1.5, "C", True, "D": None}, {"A": 2, "B": 2.5, "C", True, "D": None}, {"A": 3, "B": 3.5, "C", False, "D": None}, {"A": 4, "B": 4.5, "C", False, "D": None}, ])
我们还可以从以下数据结构创建 DataFrame:
from_pandas(data: Union["pd.DataFrame", list["pd.DataFrame"]]) from_arrow(data: Union["pa.Table", list["pa.Table"], Iterable["pa.Table"]]) from_ray_dataset(ds: "RayDataset") from_dask_dataframe(ddf: "dask.DataFrame")
除此之外,我们还可以从数据源读取数据为 dataframe。详情请见第二节“数据源支持”。
使用 ve-daft 的 dataframe 跟使用其他引擎,比如 Pandas、PySpark 等 dataframe 类似,常见的操作基本是相同的,很容易就会上手,比如:
选择列,
df.select("A", "B").show() ╭───────┬─────────╮ │ A ┆ B │ │ --- ┆ --- │ │ Int64 ┆ Float64 │ ╞═══════╪═════════╡ │ 3 ┆ 3.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 4 ┆ 4.5 │ ╰───────┴─────────╯ (Showing first 2 of 2 rows)
或者是过滤
df.filter(df["A"] > 2).show() ╭───────┬─────────┬─────────┬──────┬─────────╮ │ A ┆ B ┆ C ┆ D ┆ E │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ Int64 ┆ Float64 ┆ Boolean ┆ Null ┆ Float64 │ ╞═══════╪═════════╪═════════╪══════╪═════════╡ │ 3 ┆ 3.5 ┆ false ┆ None ┆ 6.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 4 ┆ 4.5 ┆ false ┆ None ┆ 8.5 │ ╰───────┴─────────┴─────────┴──────┴─────────╯ (Showing first 2 of 2 rows)
或者是使用普通的 Expression 执行计算:
df = df.with_column("E", df["A"] + df["B"]).show() ╭───────┬─────────┬─────────┬──────┬─────────╮ │ A ┆ B ┆ C ┆ D ┆ E │ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ Int64 ┆ Float64 ┆ Boolean ┆ Null ┆ Float64 │ ╞═══════╪═════════╪═════════╪══════╪═════════╡ │ 1 ┆ 1.5 ┆ true ┆ None ┆ 2.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 2 ┆ 2.5 ┆ true ┆ None ┆ 4.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 3 ┆ 3.5 ┆ false ┆ None ┆ 6.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 4 ┆ 4.5 ┆ false ┆ None ┆ 8.5 │ ╰───────┴─────────┴─────────┴──────┴─────────╯ (Showing first 4 of 4 rows)
等等。
用户还可以使用 UDF 对 dataframe 进行操作。daft 的 UDF 与 Pandas、PySpark 等引擎的 UDF 有独到之处,它可以支持 class。这种 UDF 对于 AI 计算有非常重要的作用,详情可以参考下文第三节“用户自定义函数”。
关于更多的操作,可以参考 daft 官方文档。
daft 的一大特点就是针对多模态数据的支持。这种多模态支持体现在两个方面:
download() 方法对文件进行下载,对于 Image 类型,可以使用 decode() 方法对图片进行解码等。下面展示一个例子:df = daft.read_parquet("metadata.parquet") \ .with_column("image_data", col("urls").url.download()) \ .with_column("resized", col("image_data").image.resize(224, 224))
ve-daft 支持多种数据源,除了常见的 parquet、csv、json,还支持 lance 这种新兴数据湖格式,以及 LAS 的数据集读写。完整的列表如下:
其中 clickhouse、mcap、las_dataset 为 ve-daft 专有支持。
目前 ve-daft 尚未适配 tos 协议,因此用户在读写位于 tos 上的数据时仍然需要使用 s3 兼容协议。下面展示了一下如何读取位于 tos 上的 csv 数据,以及如何向 tos 写 csv 数据。
export ACCESS_KEY=xxx export SECRET_KEY=xxx export TOS_ENDPOINT=https://tos-cn-beijing.volces.com export TOS_REGION=cn-beijing
import daft from daft.daft import IOConfig from daft.las.io.tos import TOSConfig s3_config = TOSConfig.from_env().to_s3_config() df = daft.read_csv(path="s3://path/to/csv/dir", io_config=IOConfig(s3=s3_config))
import daft from daft.daft import IOConfig from daft.las.io.tos import TOSConfig df = daft.from_pydict({ "A": [1, 2, 3, 4], "B": [1.5, 2.5, 3.5, 4.5], "C": [True, True, False, False], "D": [None, None, None, None], }) s3_config = TOSConfig.from_env().to_s3_config() df = daft.write_csv(root_dir="s3://path/to/csv/dir", io_config=IOConfig(s3=s3_config))
其他格式的读写可以参考 daft 的官方文档。
ve-daft 的一大特点就是支持对 LAS 数据集的读写。关于对 LAS 数据集的读写可以参考文档 读写 LAS 数据集。
ve-daft 支持 两种类型的 UDF:function 类型以及 class 类型,其中 class 类型是 ve-daft 区别于其他引擎(如 spark、hive)的一个重要特点。function 类型 UDF 进一步可以划分为 lambda function UDF 和普通 function UDF。
这类 UDF 一般用于行内计算,适用于简单的场景,它通过 .apply() 发起调用:
df.with_column( "flattened_image", df["image"].apply(lambda img: img.flatten(), return_dtype=daft.DataType.python()) ).show(2)
这类 UDF 就是我们在其他引擎常见的 UDF,它通过 @daft.udf() 注解进行定义:
@daft.udf(return_dtype=daft.DataType.python()) def crop_images(images, crops, padding=0): cropped = [] for img, crop in zip(images, crops): x1, x2, y1, y2 = crop cropped_img = img[x1:x2 + padding, y1:y2 + padding] cropped.append(cropped_img) return cropped df = df.with_column( "cropped", crop_images(df["image"], df["crop"], padding=1), ) df.show(2)
它也是通过 @daft.udf() 定义,区别与 function 类,它是定义在 class 上的:
@daft.udf( return_dtype=daft.DataType.int64(), concurrency=4, # initialize 4 instances of the UDF to run across your data ) class RunModel: def __init__(self, model_name = "meta-llama/Llama-4-Scout-17B-16E-Instruct"): # Perform expensive initializations self._model = create_model(model_name) def __call__(self, features_col): return self._model(features_col) df = df.with_column("image_classifications", RunModel(df["image"]))
Class UDF 的优点在于它可以维护“状态”,可以重复利用,因此也可以做一些比较重的操作,比如加载模型。如果使用 function 类 UDF,则模型加载需要在每次调用 UDF 时进行,其时间消耗是无法想象的。
“算子”是 ve-daft 提供的,用于多模态数据处理的函数,比如文本特殊字符移除、敏感信息过滤、视频抽取音频、说话人识别等等,覆盖范围广阔。算子的本质是具备特定接口的 class,可以通过 UDF 的方式调用。
如下代码是一个真实可运行的例子:
import os import daft from daft import col from daft.las.functions.audio import AudioDuration from daft.las.functions.udf import las_udf if __name__ == "__main__": sample_path = f"tos://tos_bucket/audio_duration/sample.mp3" df = daft.from_pydict({"audio_path": [sample_path]}) df = df.with_column("duration_result", las_udf(AudioDuration)(col("audio_path"))) df.show() # ╭────────────────────────────────┬─────────────────╮ # │ audio_path ┆ duration_result │ # │ --- ┆ --- │ # │ Utf8 ┆ Float32 │ # ╞════════════════════════════════╪═════════════════╡ # │ tos://tos_bucket/audio_durati… ┆ 49.711 │ # ╰────────────────────────────────┴─────────────────╯
在上面的代码中,AudioDuration 是用于计算音频时长的函数(callable class),las_udf 则是一个封装方法,将 AudioDuration 封装为一个 ve-daft 的 UDF。las_udf() 封装了多个参数:
las_udf( operator: type[Operator], construct_args: dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory_bytes: int | None = None, batch_size: int | None = None, concurrency: int | None = None, )
参数说明:
__init__ 命名参数。用户可以自定义算子,只需要继承如下类并实现相应的接口即可:
class Operator(ABC): """An operator that process the input data.""" def transform(self, *args: Any, **kwargs: Any) -> pa.Array: raise NotImplementedError @staticmethod @abstractmethod def __return_column_type__() -> pa.DataType: ...
说明
transform(self, col: pa.Array) -> pa.Array。__return_column_type__ 定义的返回类型是 pa.Array 内部内容的返回类型,比如,如果 pa.Array 内存放的为 float 数据,那么返回类型应为 pa.float64()。下面是一个简单的例子,返回值本身:
import daft import pyarrow as pa from daft import col from daft.las.functions.types import Operator from daft.las.functions.udf import las_udf # 1. 定义 operator class EchoUDF(Operator): @staticmethod def __return_column_type__() -> pa.DataType: return pa.int64() def transform(self, col_a: pa.Array, col_b: pa.Array | None = None, c: str | None = None) -> pa.Array: return col_a # 2. 使用 oerator df = daft.from_pydict({ "A": [1, 2, 3, 4], "B": [1.5, 2.5, 3.5, 4.5] }) df = df.with_column("dup_a", las_udf(EchoUDF)(col("A"))) df.show()
输出:
╭───────┬─────────┬───────╮ │ A ┆ B ┆ dup_a │ │ --- ┆ --- ┆ --- │ │ Int64 ┆ Float64 ┆ Int64 │ ╞═══════╪═════════╪═══════╡ │ 1 ┆ 1.5 ┆ 1 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 2.5 ┆ 2 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 3.5 ┆ 3 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 4 ┆ 4.5 ┆ 4 │ ╰───────┴─────────┴───────╯ (Showing first 4 of 4 rows)