You need to enable JavaScript to run this app.
文档中心
AI 数据湖服务

AI 数据湖服务

复制全文
下载 pdf
Daft
概述
复制全文
下载 pdf
概述

Daft 简介

DAFT 是全球首个原生支持多模态数据处理的分布式计算引擎,突破传统 ETL 范式,提供:
统一多模态数据抽象 + Python/Rust 原生 API + 分布式执行引擎三位一体解决方案,实现从原始数据到 AI 特征的无缝衔接。
Daft 的核心定位是为企业级多模态数据湖提供端到端的处理能力。它打破了传统工具对单一数据类型的依赖,能够无缝对接文本、图像、视频、音频、JSON、Parquet 、HuggingFace Dataset 等数十种数据格式,实现从数据摄入、清洗、转换到分析、建模的全流程自动化。无论是电商平台的商品图片与用户评论,还是智能驾驶的视频与点云数据,Daft 都能将分散的多模态信息汇聚成结构化的 “数据资产”,为后续的 AI 训练、业务分析提供一致的数据处理底座。

Daft 技术优势

Daft 源于面向多模态数据处理的设计理念,支持原生多模态数据类型,可助力用户处理 PB 级多模态数据,并能与现有机器学习系统实现无缝对接,提供一流的数据处理性能。

  1. 原生多模式处理:在单一统一的框架中,通过支持image、embedding 和 tensor 等原生多模态数据类型,提供内置的图像解码、向量相似度计算等操作,避免传统方案中的冗长流程。
  2. 异构资源调度:Daft 可依据任务的计算特性自动分配最优异构资源,在同一工作流中实现 CPU 与 GPU 计算任务的无缝协调,从而最大化资源利用率并显著提升整体计算性能。
  3. Rust-Powered 的性能:借助 Rust 内核,提供向量化执行与异步 I/O 能力,以更少的内存处理相同的查询,支持 PB 级数据处理。
  4. 无缝 ML 生态系统集成:无缝集成现有的机器学习(ML)工作负载,提供与 PyTorch、NumPy、Pandas、HuggingFace 等工作负载的兼容接口,便于与现有系统对接。
  5. 深度数据湖协同:支持多种云存储介质(如 S3、火山引擎 TOS 等),无缝对接现代数据湖格式(如 Apache® Iceberg、Hudi、Delta Lake 等),深度优化的 AI 数据湖 Lance 格式对接,支持企业级 catalogs(如 Unity、AWS Glue 等)。

    Apache Iceberg, the names of other Apache projects, and the ASF logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

  6. Python UDF 数据处理能力:​利用由 Apache Arrow 提供支持的零拷贝(zero-copy)能力,可在 UDF 中直接使用处理多模态数据,进而消除数据移动开销并提升处理速度。
@daft.udf(return_dtype=daft.DataType.python())
def crop_images(images, crops, padding=0):
    cropped = []
    for img, crop in zip(images, crops):
        x1, x2, y1, y2 = crop
        cropped_img = img[x1:x2 + padding, y1:y2 + padding]
        cropped.append(cropped_img)
    return cropped

df = df.with_column(
    "cropped",
    crop_images(df["image"], df["crop"], padding=1),
)
df.show(2)
  1. 开箱即用的可靠性:智能内存管理可防止 OOM 错误,合理的默认值避免繁杂配置,使用户能够专注于业务,提供开箱即用的使用体验。

Daft 适用场景

文本生成

传统引擎(如Pandas)在处理大规模文本数据与结构化数据融合时效率欠佳,这使RAG等应用的数据准备工作更为复杂。Spark 无法进行 CPU/GPU 资源的异构调度,致使文本处理任务的资源利用效率参差不齐。Daft 原生支持多模态数据处理,可借助 Daft 的内置 embedding 类型及 embedding 函数,将 CPU 算子与 GPU 算子高效调度至相应资源,灵活配置不同类型算子所需资源,实现 GPU 的高效利用。其内置的 Rust 内核带来高效的数据处理效能,让业务人员专注于业务流程开发,无需在繁琐的数据处理链条上耗费大量精力。

图像处理

在图像分类、风格迁移或 AIGC(如 Stable Diffusion)场景中,Daft 可对分布式预处理与模型推理链路进行管理。Daft 的异构调度能力,能够将分辨率调整、特征提取和标签生成等不同类型的 CPU/GPU 算子调度至相应的资源上,借助流式计算能力,提高 GPU 利用率,显著提升图像处理与生成吞吐量。内置的图像类型和图像处理函数可实现高效的图像处理。

视频处理

常规视频处理需将视频拆分为帧进行单独处理,流程繁琐且难以保持时序一致性。Daft 基于 native 的多模态 AI 数据湖架构,直接支持音视频等多模态数据处理,无需额外转换步骤。 作为由 Rust 为内核的分布式引擎,Daft 能高效处理视频数据的并行计算需求,在保证处理速度的同时维持数据完整性,特别适合大规模视频数据集的AI训练前处理。

智能驾驶

智能驾驶系统面临多源异构数据(如点云、图像、传感器数据)融合难题,传统引擎如 Spark 难以进行统一处理。Daft 支持图文、音视频、点云等多模态数据的无缝集成,专为解决此类复杂数据处理场景而设计。该引擎源自 Lyft 在数据处理方面的实际需求,能够高效处理智能驾驶中的实时数据流,实现多模态数据的弹性扩展与工程化实施,同时兼顾成本控制与高效能输出。Daft 支持与 PyTorch 无缝融合,处理后的数据可直接融入智能驾驶模型训练,提升智能驾驶模型训练效率。

具身智能

具身智能需处理视频、触觉、动作等多模态数据流,传统数据处理工具难以满足多模态数据的复杂要求。Daft 作为 AI 原生的多模态数据处理平台,能够对接 HDF5、LeRobot、mcap 等多种具身场景数据源,为具身智能提供统一的数据处理中间层。其分布式架构支持复杂的数据预处理与特征工程,可使机器人系统更高效地整合视觉、触觉、位置等多种感知数据,提高环境交互的准确性与响应速度。

Daft快速上手

以下为使用 Daft 对一个 TOS 路径中的图像文件进行查询,并过滤掉不符合要求的图像,最终使用 UDF 生成一个包含红色遮挡图片的红色标记的样例。

import daft
import numpy as np
import PIL.Image
from PIL import ImageFilter 

IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))  # Use anonymous S3 access
# 加载图像数据  
df = daft.from_glob_path(
    "s3://daft-public-data/open-images/validation-images/*",
    io_config=IO_CONFIG,
)

# 过滤掉过大图像
df = df.where(df["size"] < 300000)

df = df.select("path")
df = df.with_column("image", df["path"].url.download(io_config=IO_CONFIG))
df = df.with_column("image", df["image"].image.decode())

def magic_red_detector(img: np.ndarray) -> PIL.Image.Image:
    """Gets a new image which is a mask covering all 'red' areas in the image."""
    img = PIL.Image.fromarray(img)
    lower = np.array([245, 100, 100])
    upper = np.array([10, 255, 255])
    lower_hue, upper_hue = lower[0, np.newaxis, np.newaxis], upper[0, np.newaxis, np.newaxis]
    lower_saturation_intensity, upper_saturation_intensity = (
        lower[1:, np.newaxis, np.newaxis],
        upper[1:, np.newaxis, np.newaxis],
    )
    hsv = img.convert("HSV")
    hsv = np.asarray(hsv).T
    mask = np.all(
        (hsv[1:, ...] >= lower_saturation_intensity) & (hsv[1:, ...] <= upper_saturation_intensity), axis=0
    ) & ((hsv[0, ...] >= lower_hue) | (hsv[0, ...] <= upper_hue))
    img = PIL.Image.fromarray(mask.T)
    img = img.filter(ImageFilter.ModeFilter(size=5))
    return img

df = df.with_column(
    "red_mask",
    df["image"].apply(magic_red_detector, return_dtype=daft.DataType.python()),
)
df.show()
最近更新时间:2026.04.13 11:47:40
这个页面对您有帮助吗?
有用
有用
无用
无用