DAFT 是全球首个原生支持多模态数据处理的分布式计算引擎,突破传统 ETL 范式,提供:
统一多模态数据抽象 + Python/Rust 原生 API + 分布式执行引擎三位一体解决方案,实现从原始数据到 AI 特征的无缝衔接。
Daft 的核心定位是为企业级多模态数据湖提供端到端的处理能力。它打破了传统工具对单一数据类型的依赖,能够无缝对接文本、图像、视频、音频、JSON、Parquet 、HuggingFace Dataset 等数十种数据格式,实现从数据摄入、清洗、转换到分析、建模的全流程自动化。无论是电商平台的商品图片与用户评论,还是智能驾驶的视频与点云数据,Daft 都能将分散的多模态信息汇聚成结构化的 “数据资产”,为后续的 AI 训练、业务分析提供一致的数据处理底座。
Daft 源于面向多模态数据处理的设计理念,支持原生多模态数据类型,可助力用户处理 PB 级多模态数据,并能与现有机器学习系统实现无缝对接,提供一流的数据处理性能。
@daft.udf(return_dtype=daft.DataType.python()) def crop_images(images, crops, padding=0): cropped = [] for img, crop in zip(images, crops): x1, x2, y1, y2 = crop cropped_img = img[x1:x2 + padding, y1:y2 + padding] cropped.append(cropped_img) return cropped df = df.with_column( "cropped", crop_images(df["image"], df["crop"], padding=1), ) df.show(2)
传统引擎(如Pandas)在处理大规模文本数据与结构化数据融合时效率欠佳,这使RAG等应用的数据准备工作更为复杂。Spark 无法进行 CPU/GPU 资源的异构调度,致使文本处理任务的资源利用效率参差不齐。Daft 原生支持多模态数据处理,可借助 Daft 的内置 embedding 类型及 embedding 函数,将 CPU 算子与 GPU 算子高效调度至相应资源,灵活配置不同类型算子所需资源,实现 GPU 的高效利用。其内置的 Rust 内核带来高效的数据处理效能,让业务人员专注于业务流程开发,无需在繁琐的数据处理链条上耗费大量精力。
在图像分类、风格迁移或 AIGC(如 Stable Diffusion)场景中,Daft 可对分布式预处理与模型推理链路进行管理。Daft 的异构调度能力,能够将分辨率调整、特征提取和标签生成等不同类型的 CPU/GPU 算子调度至相应的资源上,借助流式计算能力,提高 GPU 利用率,显著提升图像处理与生成吞吐量。内置的图像类型和图像处理函数可实现高效的图像处理。
常规视频处理需将视频拆分为帧进行单独处理,流程繁琐且难以保持时序一致性。Daft 基于 native 的多模态 AI 数据湖架构,直接支持音视频等多模态数据处理,无需额外转换步骤。 作为由 Rust 为内核的分布式引擎,Daft 能高效处理视频数据的并行计算需求,在保证处理速度的同时维持数据完整性,特别适合大规模视频数据集的AI训练前处理。
智能驾驶系统面临多源异构数据(如点云、图像、传感器数据)融合难题,传统引擎如 Spark 难以进行统一处理。Daft 支持图文、音视频、点云等多模态数据的无缝集成,专为解决此类复杂数据处理场景而设计。该引擎源自 Lyft 在数据处理方面的实际需求,能够高效处理智能驾驶中的实时数据流,实现多模态数据的弹性扩展与工程化实施,同时兼顾成本控制与高效能输出。Daft 支持与 PyTorch 无缝融合,处理后的数据可直接融入智能驾驶模型训练,提升智能驾驶模型训练效率。
具身智能需处理视频、触觉、动作等多模态数据流,传统数据处理工具难以满足多模态数据的复杂要求。Daft 作为 AI 原生的多模态数据处理平台,能够对接 HDF5、LeRobot、mcap 等多种具身场景数据源,为具身智能提供统一的数据处理中间层。其分布式架构支持复杂的数据预处理与特征工程,可使机器人系统更高效地整合视觉、触觉、位置等多种感知数据,提高环境交互的准确性与响应速度。
以下为使用 Daft 对一个 TOS 路径中的图像文件进行查询,并过滤掉不符合要求的图像,最终使用 UDF 生成一个包含红色遮挡图片的红色标记的样例。
import daft import numpy as np import PIL.Image from PIL import ImageFilter IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True)) # Use anonymous S3 access # 加载图像数据 df = daft.from_glob_path( "s3://daft-public-data/open-images/validation-images/*", io_config=IO_CONFIG, ) # 过滤掉过大图像 df = df.where(df["size"] < 300000) df = df.select("path") df = df.with_column("image", df["path"].url.download(io_config=IO_CONFIG)) df = df.with_column("image", df["image"].image.decode()) def magic_red_detector(img: np.ndarray) -> PIL.Image.Image: """Gets a new image which is a mask covering all 'red' areas in the image.""" img = PIL.Image.fromarray(img) lower = np.array([245, 100, 100]) upper = np.array([10, 255, 255]) lower_hue, upper_hue = lower[0, np.newaxis, np.newaxis], upper[0, np.newaxis, np.newaxis] lower_saturation_intensity, upper_saturation_intensity = ( lower[1:, np.newaxis, np.newaxis], upper[1:, np.newaxis, np.newaxis], ) hsv = img.convert("HSV") hsv = np.asarray(hsv).T mask = np.all( (hsv[1:, ...] >= lower_saturation_intensity) & (hsv[1:, ...] <= upper_saturation_intensity), axis=0 ) & ((hsv[0, ...] >= lower_hue) | (hsv[0, ...] <= upper_hue)) img = PIL.Image.fromarray(mask.T) img = img.filter(ImageFilter.ModeFilter(size=5)) return img df = df.with_column( "red_mask", df["image"].apply(magic_red_detector, return_dtype=daft.DataType.python()), ) df.show()