DAFT 是全球首个原生支持多模态数据处理的分布式计算引擎,突破传统 ETL 范式,提供:
统一多模态数据抽象 + Python/Rust 原生 API + 分布式执行引擎三位一体解决方案,实现从原始数据到 AI 特征的无缝衔接。
Daft 的核心定位是为企业级多模态数据湖提供端到端的处理能力。它打破了传统工具对单一数据类型的依赖,能够无缝对接文本、图像、视频、音频、JSON、Parquet 、HuggingFace Dataset 等数十种数据格式,实现从数据摄入、清洗、转换到分析、建模的全流程自动化。无论是电商平台的商品图片与用户评论,还是智能驾驶的视频与点云数据,Daft 都能将分散的多模态信息汇聚成结构化的 “数据资产”,为后续的 AI 训练、业务分析提供一致的数据处理底座。
Daft 源于面向多模态数据处理的设计理念,支持原生多模态数据类型,可助力用户处理 PB 级多模态数据,并能与现有机器学习系统实现无缝对接,提供一流的数据处理性能。
Apache Iceberg, the names of other Apache projects, and the ASF logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
@daft.udf(return_dtype=daft.DataType.python()) def crop_images(images, crops, padding=0): cropped = [] for img, crop in zip(images, crops): x1, x2, y1, y2 = crop cropped_img = img[x1:x2 + padding, y1:y2 + padding] cropped.append(cropped_img) return cropped df = df.with_column( "cropped", crop_images(df["image"], df["crop"], padding=1), ) df.show(2)
传统引擎(如Pandas)在处理大规模文本数据与结构化数据融合时效率欠佳,这使RAG等应用的数据准备工作更为复杂。Spark 无法进行 CPU/GPU 资源的异构调度,致使文本处理任务的资源利用效率参差不齐。Daft 原生支持多模态数据处理,可借助 Daft 的内置 embedding 类型及 embedding 函数,将 CPU 算子与 GPU 算子高效调度至相应资源,灵活配置不同类型算子所需资源,实现 GPU 的高效利用。其内置的 Rust 内核带来高效的数据处理效能,让业务人员专注于业务流程开发,无需在繁琐的数据处理链条上耗费大量精力。
在图像分类、风格迁移或 AIGC(如 Stable Diffusion)场景中,Daft 可对分布式预处理与模型推理链路进行管理。Daft 的异构调度能力,能够将分辨率调整、特征提取和标签生成等不同类型的 CPU/GPU 算子调度至相应的资源上,借助流式计算能力,提高 GPU 利用率,显著提升图像处理与生成吞吐量。内置的图像类型和图像处理函数可实现高效的图像处理。
常规视频处理需将视频拆分为帧进行单独处理,流程繁琐且难以保持时序一致性。Daft 基于 native 的多模态 AI 数据湖架构,直接支持音视频等多模态数据处理,无需额外转换步骤。 作为由 Rust 为内核的分布式引擎,Daft 能高效处理视频数据的并行计算需求,在保证处理速度的同时维持数据完整性,特别适合大规模视频数据集的AI训练前处理。
智能驾驶系统面临多源异构数据(如点云、图像、传感器数据)融合难题,传统引擎如 Spark 难以进行统一处理。Daft 支持图文、音视频、点云等多模态数据的无缝集成,专为解决此类复杂数据处理场景而设计。该引擎源自 Lyft 在数据处理方面的实际需求,能够高效处理智能驾驶中的实时数据流,实现多模态数据的弹性扩展与工程化实施,同时兼顾成本控制与高效能输出。Daft 支持与 PyTorch 无缝融合,处理后的数据可直接融入智能驾驶模型训练,提升智能驾驶模型训练效率。
具身智能需处理视频、触觉、动作等多模态数据流,传统数据处理工具难以满足多模态数据的复杂要求。Daft 作为 AI 原生的多模态数据处理平台,能够对接 HDF5、LeRobot、mcap 等多种具身场景数据源,为具身智能提供统一的数据处理中间层。其分布式架构支持复杂的数据预处理与特征工程,可使机器人系统更高效地整合视觉、触觉、位置等多种感知数据,提高环境交互的准确性与响应速度。
以下为使用 Daft 对一个 TOS 路径中的图像文件进行查询,并过滤掉不符合要求的图像,最终使用 UDF 生成一个包含红色遮挡图片的红色标记的样例。
import daft import numpy as np import PIL.Image from PIL import ImageFilter IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True)) # Use anonymous S3 access # 加载图像数据 df = daft.from_glob_path( "s3://daft-public-data/open-images/validation-images/*", io_config=IO_CONFIG, ) # 过滤掉过大图像 df = df.where(df["size"] < 300000) df = df.select("path") df = df.with_column("image", df["path"].url.download(io_config=IO_CONFIG)) df = df.with_column("image", df["image"].image.decode()) def magic_red_detector(img: np.ndarray) -> PIL.Image.Image: """Gets a new image which is a mask covering all 'red' areas in the image.""" img = PIL.Image.fromarray(img) lower = np.array([245, 100, 100]) upper = np.array([10, 255, 255]) lower_hue, upper_hue = lower[0, np.newaxis, np.newaxis], upper[0, np.newaxis, np.newaxis] lower_saturation_intensity, upper_saturation_intensity = ( lower[1:, np.newaxis, np.newaxis], upper[1:, np.newaxis, np.newaxis], ) hsv = img.convert("HSV") hsv = np.asarray(hsv).T mask = np.all( (hsv[1:, ...] >= lower_saturation_intensity) & (hsv[1:, ...] <= upper_saturation_intensity), axis=0 ) & ((hsv[0, ...] >= lower_hue) | (hsv[0, ...] <= upper_hue)) img = PIL.Image.fromarray(mask.T) img = img.filter(ImageFilter.ModeFilter(size=5)) return img df = df.with_column( "red_mask", df["image"].apply(magic_red_detector, return_dtype=daft.DataType.python()), ) df.show()