You need to enable JavaScript to run this app.
导航
技术原理
最近更新时间:2025.08.27 11:36:06首次发布时间:2025.08.27 11:36:06
复制全文
我的收藏
有用
有用
无用
无用

Daft 是一款面向 DATA + AI 多模态数据处理与分析场景的计算引擎,支持单机和分布式两种运行模式,内核采用 Rust 语言编写,并提供 SQL 和 Python DataFrame 两种交互方式。

整体架构

考虑 DATA + AI 场景多模态数据处理的特点,Daft 选择采用 Pipeline 执行模型进行整体架构设计。用户通过 Daft DataFrame 或 SQL API 编写的应用程序会在 Daft 内部首先被表示成一棵抽象语法树(AST: Abstract Syntax Tree),在经过一系列的优化和转化规则处理后,会将这棵抽象语法树转换成最终可被底层执行引擎执行的 Pipeline 树,并最终提交给单机或分布式执行引擎执行。

架构分层

Image
如上图所示,在整体架构设计上可将 Daft 分为 API 层、Plan 层,以及执行层 3 层:

  • API 层:提供 SQL 和 Python DataFrame 两种接入 API,通过对分布式数据进行表格化抽象,并按照行维度切分成多个 Partition 分散到集群的各个节点进行分布式处理,以实现对数据的并行计算。Daft 默认按照启发式策略进行分区(例如沿用文件的默认分区策略),也支持手动 repartition。
  • Plan 层:基于用户输入的 SQL 或构造的 DataFrame 对象构建逻辑执行计划 LogicalPlan,并运用内置 LogicalPlan 优化器对其进行优化(例如消除 CrossJoin/Subquery/Repartition、谓词下推、列裁剪、Join Reorder 等)得到优化后的 LogicalPlan。经过优化处理后的 LogicalPlan 将被进一步转换为 PhysicalPlan,并使用 PhysicalPlan 优化器做进一步优化(例如 Repartition 消除)。
  • 执行层:Daft 支持单机和分布式两种运行模式,经过 Plan 层构造的 PhysicalPlan 需要依据具体的运行模式进一步拆分成一系列可被执行的 Task,并由 Scheduler 将 Task 调度给单机 Swordfish 引擎或分布式 Flotilla 引擎执行,同时管理任务的执行状态。

Plan 示例

我们以下面的 DataFrame 为例,从整体执行链路层面阐述 Daft 从该 DataFrame 出发,如何通过优化和转换最终生成 PhysicalPlan 的执行流程。

df = daft.read_parquet("/opt/workspace/daft/data")

df = (
    df
    .select("id", "name")
    .sort(by="id", desc=True)
    .filter(col("id") < 127)
    .offset(100)
    .limit(10)
)

df.to_pydict()

这是一个比较简单的 DataFrame 示例,用于读取 parquet 数据集,并按要求从中筛选出 10 条记录。Daft 在内部会按照 DataFrame 的构造顺序将其转换成 LogicalPlan 进行表示:

* Limit: 10
|
* Offset: 100
|
* Filter: col(id) < lit(127)
|
* Sort: Sort by = (col(id), descending, nulls first)
|
* Project: col(id), col(name)
|
* GlobScanOperator
|   Glob paths = [/opt/workspace/daft/data]
|   Coerce int96 timestamp unit = Nanoseconds
|   IO config = ...
|   Use multithreading = true
|   File schema = id#Int64, name#Utf8, email#Utf8
|   Partitioning keys = []
|   Output schema = id#Int64, name#Utf8, email#Utf8

考虑用户编写的 DataFrame 或 SQL 不一定是最优的,为了追求极致的执行效率,Daft 内部的 LogicalPlan 优化器会遍历应用一系列优化规则对上述 LogicalPlan 进行优化,并最终得到如下优化后的 LogicalPlan:

* TopN: Sort by = (col(id), descending, nulls first), Num Rows = 10, Offset = 100
|   Stats = { Approx num rows = 110, Approx size bytes = 3.01 KiB, Accumulated
|     selectivity = 0.01 }
|
* Num Scan Tasks = 112
|   File schema = id#Int64, name#Utf8, email#Utf8
|   Partitioning keys = []
|   Projection pushdown = [id, name]
|   Filter pushdown = col(id) < lit(127)
|   Output schema = id#Int64, name#Utf8
|   Stats = { Approx num rows = 3,279, Approx size bytes = 92.09 KiB, Accumulated
|     selectivity = 0.30 }

优化后的 LogicalPlan 相对优化前精简了许多,随之带来的是执行效率的提升。从上述优化后的 LogicalPlan 我们也能够猜到 Daft 大致应用了如下优化规则:

  • 谓词下推:将 .filter(col("id") < 127).select("id", "name") 这类操作下推到数据扫描阶段,从而极大减少数据的扫描量。
  • Limit Offset 改写:通过将 .offset(100).limit(10) 改写成 global_limit(10, 100)local_limit(0, 110) 来实现 offset...limit 语义。
  • Sort Limit 改写:通过将 .sort(by="id", desc=True).limit(10, 100) 改写成 TopN 算子实现排序和截断语义。

经过优化后的 LogicalPlan 会被进一步转换成 PhysicalPlan,并进一步应用针对 PhysicalPlan 的优化规则。例如上述优化后的 LogicalPlan 会被转换成如下 PhysicalPlan:

* TopN: Sort by = (col(0: id), descending, nulls first), Num Rows = 10, Offset
|     = 100
|   Stats = { Approx num rows = 110, Approx size bytes = 3.01 KiB, Accumulated
|     selectivity = 0.01 }
|
* ScanTaskSource:
|   Num Scan Tasks = 112
|   Estimated Scan Bytes = 180568
|   Pushdowns: {projection: [id, name], filter: col(id) < lit(127)}
|   Schema: {id#Int64, name#Utf8, email#Utf8}
|   Scan Tasks: [ ... ]
|   Stats = { Approx num rows = 3,279, Approx size bytes = 92.09 KiB, Accumulated
|     selectivity = 0.30 }

执行引擎

目前在 DATA + AI 领域主流的数据计算引擎(例如 Pandas、Polars)都默认仅提供单机执行模式,其中 Polars 的分布式版本仅在商业版提供,而 Dask 虽然可以看作是 Pandas 的分布式版本,但执行效率较低。Daft 在开源版本中同时提供了单机和分布式两种运行模式,并且允许在这两种模式之间任意切换。

单机执行引擎

Daft 默认以单机模式执行用户输入的 SQL 或构造的 DataFrame,其单机执行引擎 Swordfish 采用 Rust 语言编写,该引擎基于 Pipeline 计算模型设计,借助多线程实现对数据的并行计算。
Image

如上图所示,左边的 PhysicalPlan 会被 Swordfish 引擎解析成一棵 Pipeline 树,引擎会以深度优先的方式对数树进行遍历,并逐一启动运行各个节点。Swordfish PipelineNode 包含 4 类节点:

节点类型

定义

并发数

Operator 示例

SourceNode

数据源节点

依赖数据分片数,默认上限为 8

ScanTaskSource

IntermediateNode

中间处理节点

取决于具体的 Operator 类型,默认为所在节点的 CPU 核数

Project、Filter、UDF、JoinProbe

BlockingSinkNode

阻塞数据接收节点

取决于具体的 Operator 类型,默认为所在节点的 CPU 核数

Aggregate、JoinBuild、TopN、Sort

StreamingSinkNode

流式数据接收节点

取决于具体的 Operator 类型,默认为所在节点的 CPU 核数

Limit、MonotonicallyIncreasingId

节点间的数据传递依赖 Kanal Channel 实现。

分布式执行引擎

Daft 不仅支持以单机模式运行,还支持基于 Ray 以分布式模式运行。用户只需要简单配置 Ray 集群的连接信息即可实现在单机模式和分布式模式之间切换,无需侵入到业务逻辑层面进行适配。
Flotilla v1.5 版本是 Daft 最新的分布式执行引擎,其设计目标是为多模态数据处理提供更好的可扩展性、性能和可观察性,支持:

  1. 稳健的处理高膨胀的多模态纯映射管道(读取 URL、下载 UDF 和写入模式)。
  2. 可靠地重排 10TB 数据以进行交换操作(groupby、join、sort)。
  3. 提供易于理解且可操作的可观察性工具。
  4. 允许 Daft 开发人员快速进行调试和开发。

在设计理念上,Flotilla v1.5 版本通过 Ray 将多个 Swordfish 单机执行引擎分布式化,不同节点之间借助内置的 Flight Shuffle Server 实现数据 Shuffle 传递
Image
Flotilla 分布式执行引擎在实现层面包含如下几个核心组件:

  • Ray Worker Manager:用于管理 Ray 集群上的 Swordfish Worker 节点,确保每个 Ray Worker 节点上运行一个 RaySwordfishActor,此外还负责按需触发Ray 集群的 Autoscale,以及提交 Swordfish Task 到目标 Worker 节点执行等。
  • Plan Runner:负责执行 PhysicalPlan,通过将 PhysicalPlan 对应的多个 StagePlan 拆分成一系列 SwordfishTask,并由 Scheduler 调度给 Ray 集群执行。
  • Scheduler:负责循环监听任务队列中是否有待执行的 SwordfishTask,并由 Dispatcher 将任务分发给 Ray 集群执行。此外,Scheduler 还负责在接收到 Autoscale 请求时触发 Ray 集群执行 Autoscale。
  • Dispatcher:负责将 SwordfishTask 按照 Ray Worker 节点进行组织,并批量将任务提交给 Ray 集群目标 Worker 节点执行。

每个 Ray 的 Worker 节点上会常驻运行一个 RaySwordfishActor,负责接收 SwordfishTask 内部包含的可以在单机 Swordfish 引擎上执行的 LocalPhysicalPlan。当接收到执行请求时,RaySwordfishActor 会将 LocalPhysicalPlan 提交给本地的 Swordfish 单机引擎执行,具体执行流程可以参考上 1 小节的介绍。
Flotilla 分布式执行引擎目前已经演化到 v1.5 版本,性能相对于 v1.0 版本有了显著提升,但功能完备度相对一般,因此当遇到不支持的操作时会 fallback 到 v1.0 版本以保证 job 能够正常运行。​区别于 v1.5 版本采用 Actor 模式配合 Swordfish 单机执行引擎执行 Pipeline 算子,v1.0 版本则将每个 Stage 切分成多个 Ray Remote Task 批量提交给 Ray 集群执行。
Image
如上图左图所示,v1.0 版本对于 PhysicalPlan 核心执行流程:

  1. 基于 PhysicalPlanScheduler 将 PhysicalPlan 拆分成一系列 Task,并按照依赖关系切分 Stage。一个 Task 主要包含 3 个属性:输入、指令集合,以及资源描述。
  2. 依据 Task 之间的依赖关系和当前 Ray 集群能够执行的任务数(按空闲 CPU 进行度量)进行任务分批,并以 Ray Remote Task 模式批量提交执行。
  1. Scheduler 内部维护一个 Queue 记录任务的最终的执行结果,并提供迭代器方式供上游访问。

Flotilla 分布式执行引擎之所以要重新设计,主要是考虑 v1.0 相对于 v1.5 存在如下 2 个核心问题:

  1. 执行性能低:对于稍复杂的任务通常会切分成多个 Stage 执行,每个 Stage 包含大量的 Task,Daft 在调度时会优先将子节点 Stage 的 Task 调度提交给 Ray 集群执行,当 Task 数目较多时通常会导致执行模式由 Pipeline 退化成 Stage by Stage。对于多模态数据处理这类异构计算场景,Stage by Stage 执行会导致资源得不到充分利用,进而导致处理性能低效。
  2. 内存压力大:Ray 集群通过 Object Memory Store 实现 Task 之间的数据交换,而 Stage by Stage 的执行模式需要将上游 Stage Task 全部处理完成之后再开始执行下游 Stage Task,中间的数据需要全部由 Object Memory Store 承载,从而给 Ray 造成了极大的内存压力,极易导致 OOM 错误。