You need to enable JavaScript to run this app.
导航
概述
最近更新时间:2025.08.25 16:56:24首次发布时间:2025.08.25 16:56:24
复制全文
我的收藏
有用
有用
无用
无用

Daft 简介

DAFT 是全球首个原生支持多模态数据处理的分布式计算引擎,突破传统 ETL 范式,提供:
统一多模态数据抽象 + Python/Rust 原生 API + 分布式执行引擎三位一体解决方案,实现从原始数据到 AI 特征的无缝衔接。
Daft 的核心定位是为企业级多模态数据湖提供端到端的处理能力。它打破了传统工具对单一数据类型的依赖,能够无缝对接文本、图像、视频、音频、JSON、Parquet 、HuggingFace Dataset 等数十种数据格式,实现从数据摄入、清洗、转换到分析、建模的全流程自动化。无论是电商平台的商品图片与用户评论,还是智能驾驶的视频与点云数据,Daft 都能将分散的多模态信息汇聚成结构化的 “数据资产”,为后续的 AI 训练、业务分析提供一致的数据处理底座。

Daft 技术优势

Daft 源于面向多模态数据处理的设计理念,支持原生多模态数据类型,可助力用户处理 PB 级多模态数据,并能与现有机器学习系统实现无缝对接,提供一流的数据处理性能。

  1. 原生多模式处理:在单一统一的框架中,通过支持image、embedding 和 tensor 等原生多模态数据类型,提供内置的图像解码、向量相似度计算等操作,避免传统方案中的冗长流程。
  2. 异构资源调度:Daft 可依据任务的计算特性自动分配最优异构资源,在同一工作流中实现 CPU 与 GPU 计算任务的无缝协调,从而最大化资源利用率并显著提升整体计算性能。
  3. Rust-Powered 的性能:借助 Rust 内核,提供向量化执行与异步 I/O 能力,以更少的内存处理相同的查询,支持 PB 级数据处理。
  4. 无缝 ML 生态系统集成:无缝集成现有的机器学习(ML)工作负载,提供与 PyTorch、NumPy、Pandas、HuggingFace 等工作负载的兼容接口,便于与现有系统对接。
  5. 深度数据湖协同:支持多种云存储介质(如 S3、火山引擎 TOS 等),无缝对接现代数据湖格式(如 Apache Iceberg、Hudi、Delta Lake 等),深度优化的 AI 数据湖 Lance 格式对接,支持企业级 catalogs(如 Unity、AWS Glue 等)。
  6. Python UDF 数据处理能力:​利用由 Apache Arrow 提供支持的零拷贝(zero-copy)能力,可在 UDF 中直接使用处理多模态数据,进而消除数据移动开销并提升处理速度。
@daft.udf(return_dtype=daft.DataType.python())
def crop_images(images, crops, padding=0):
    cropped = []
    for img, crop in zip(images, crops):
        x1, x2, y1, y2 = crop
        cropped_img = img[x1:x2 + padding, y1:y2 + padding]
        cropped.append(cropped_img)
    return cropped

df = df.with_column(
    "cropped",
    crop_images(df["image"], df["crop"], padding=1),
)
df.show(2)
  1. 开箱即用的可靠性:智能内存管理可防止 OOM 错误,合理的默认值避免繁杂配置,使用户能够专注于业务,提供开箱即用的使用体验。

Daft 适用场景

文本生成

传统引擎(如Pandas)在处理大规模文本数据与结构化数据融合时效率欠佳,这使RAG等应用的数据准备工作更为复杂。Spark 无法进行 CPU/GPU 资源的异构调度,致使文本处理任务的资源利用效率参差不齐。Daft 原生支持多模态数据处理,可借助 Daft 的内置 embedding 类型及 embedding 函数,将 CPU 算子与 GPU 算子高效调度至相应资源,灵活配置不同类型算子所需资源,实现 GPU 的高效利用。其内置的 Rust 内核带来高效的数据处理效能,让业务人员专注于业务流程开发,无需在繁琐的数据处理链条上耗费大量精力。

图像处理

在图像分类、风格迁移或 AIGC(如 Stable Diffusion)场景中,Daft 可对分布式预处理与模型推理链路进行管理。Daft 的异构调度能力,能够将分辨率调整、特征提取和标签生成等不同类型的 CPU/GPU 算子调度至相应的资源上,借助流式计算能力,提高 GPU 利用率,显著提升图像处理与生成吞吐量。内置的图像类型和图像处理函数可实现高效的图像处理。

视频处理

常规视频处理需将视频拆分为帧进行单独处理,流程繁琐且难以保持时序一致性。Daft 基于 native 的多模态 AI 数据湖架构,直接支持音视频等多模态数据处理,无需额外转换步骤。 作为由 Rust 为内核的分布式引擎,Daft 能高效处理视频数据的并行计算需求,在保证处理速度的同时维持数据完整性,特别适合大规模视频数据集的AI训练前处理。

智能驾驶

智能驾驶系统面临多源异构数据(如点云、图像、传感器数据)融合难题,传统引擎如 Spark 难以进行统一处理。Daft 支持图文、音视频、点云等多模态数据的无缝集成,专为解决此类复杂数据处理场景而设计。该引擎源自 Lyft 在数据处理方面的实际需求,能够高效处理智能驾驶中的实时数据流,实现多模态数据的弹性扩展与工程化实施,同时兼顾成本控制与高效能输出。Daft 支持与 PyTorch 无缝融合,处理后的数据可直接融入智能驾驶模型训练,提升智能驾驶模型训练效率。

具身智能

具身智能需处理视频、触觉、动作等多模态数据流,传统数据处理工具难以满足多模态数据的复杂要求。Daft 作为 AI 原生的多模态数据处理平台,能够对接 HDF5、LeRobot、mcap 等多种具身场景数据源,为具身智能提供统一的数据处理中间层。其分布式架构支持复杂的数据预处理与特征工程,可使机器人系统更高效地整合视觉、触觉、位置等多种感知数据,提高环境交互的准确性与响应速度。

Daft快速上手

以下为使用 Daft 对一个 TOS 路径中的图像文件进行查询,并过滤掉不符合要求的图像,最终使用 UDF 生成一个包含红色遮挡图片的红色标记的样例。

import daft
import numpy as np
import PIL.Image
from PIL import ImageFilter 

IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))  # Use anonymous S3 access
# 加载图像数据  
df = daft.from_glob_path(
    "s3://daft-public-data/open-images/validation-images/*",
    io_config=IO_CONFIG,
)

# 过滤掉过大图像
df = df.where(df["size"] < 300000)

df = df.select("path")
df = df.with_column("image", df["path"].url.download(io_config=IO_CONFIG))
df = df.with_column("image", df["image"].image.decode())

def magic_red_detector(img: np.ndarray) -> PIL.Image.Image:
    """Gets a new image which is a mask covering all 'red' areas in the image."""
    img = PIL.Image.fromarray(img)
    lower = np.array([245, 100, 100])
    upper = np.array([10, 255, 255])
    lower_hue, upper_hue = lower[0, np.newaxis, np.newaxis], upper[0, np.newaxis, np.newaxis]
    lower_saturation_intensity, upper_saturation_intensity = (
        lower[1:, np.newaxis, np.newaxis],
        upper[1:, np.newaxis, np.newaxis],
    )
    hsv = img.convert("HSV")
    hsv = np.asarray(hsv).T
    mask = np.all(
        (hsv[1:, ...] >= lower_saturation_intensity) & (hsv[1:, ...] <= upper_saturation_intensity), axis=0
    ) & ((hsv[0, ...] >= lower_hue) | (hsv[0, ...] <= upper_hue))
    img = PIL.Image.fromarray(mask.T)
    img = img.filter(ImageFilter.ModeFilter(size=5))
    return img

df = df.with_column(
    "red_mask",
    df["image"].apply(magic_red_detector, return_dtype=daft.DataType.python()),
)
df.show()