Daft 是一款面向 DATA + AI 多模态数据处理与分析场景的计算引擎,支持单机和分布式两种运行模式,内核采用 Rust 语言编写,并提供 SQL 和 Python DataFrame 两种交互方式。
考虑 DATA + AI 场景多模态数据处理的特点,Daft 选择采用 Pipeline 执行模型进行整体架构设计。用户通过 Daft DataFrame 或 SQL API 编写的应用程序会在 Daft 内部首先被表示成一棵抽象语法树(AST: Abstract Syntax Tree),在经过一系列的优化和转化规则处理后,会将这棵抽象语法树转换成最终可被底层执行引擎执行的 Pipeline 树,并最终提交给单机或分布式执行引擎执行。
如上图所示,在整体架构设计上可将 Daft 分为 API 层、Plan 层,以及执行层 3 层:
我们以下面的 DataFrame 为例,从整体执行链路层面阐述 Daft 从该 DataFrame 出发,如何通过优化和转换最终生成 PhysicalPlan 的执行流程。
df = daft.read_parquet("/opt/workspace/daft/data") df = ( df .select("id", "name") .sort(by="id", desc=True) .filter(col("id") < 127) .offset(100) .limit(10) ) df.to_pydict()
这是一个比较简单的 DataFrame 示例,用于读取 parquet 数据集,并按要求从中筛选出 10 条记录。Daft 在内部会按照 DataFrame 的构造顺序将其转换成 LogicalPlan 进行表示:
* Limit: 10 | * Offset: 100 | * Filter: col(id) < lit(127) | * Sort: Sort by = (col(id), descending, nulls first) | * Project: col(id), col(name) | * GlobScanOperator | Glob paths = [/opt/workspace/daft/data] | Coerce int96 timestamp unit = Nanoseconds | IO config = ... | Use multithreading = true | File schema = id#Int64, name#Utf8, email#Utf8 | Partitioning keys = [] | Output schema = id#Int64, name#Utf8, email#Utf8
考虑用户编写的 DataFrame 或 SQL 不一定是最优的,为了追求极致的执行效率,Daft 内部的 LogicalPlan 优化器会遍历应用一系列优化规则对上述 LogicalPlan 进行优化,并最终得到如下优化后的 LogicalPlan:
* TopN: Sort by = (col(id), descending, nulls first), Num Rows = 10, Offset = 100 | Stats = { Approx num rows = 110, Approx size bytes = 3.01 KiB, Accumulated | selectivity = 0.01 } | * Num Scan Tasks = 112 | File schema = id#Int64, name#Utf8, email#Utf8 | Partitioning keys = [] | Projection pushdown = [id, name] | Filter pushdown = col(id) < lit(127) | Output schema = id#Int64, name#Utf8 | Stats = { Approx num rows = 3,279, Approx size bytes = 92.09 KiB, Accumulated | selectivity = 0.30 }
优化后的 LogicalPlan 相对优化前精简了许多,随之带来的是执行效率的提升。从上述优化后的 LogicalPlan 我们也能够猜到 Daft 大致应用了如下优化规则:
.filter(col("id") < 127) 和 .select("id", "name") 这类操作下推到数据扫描阶段,从而极大减少数据的扫描量。.offset(100).limit(10) 改写成 global_limit(10, 100) 和 local_limit(0, 110) 来实现 offset...limit 语义。.sort(by="id", desc=True).limit(10, 100) 改写成 TopN 算子实现排序和截断语义。经过优化后的 LogicalPlan 会被进一步转换成 PhysicalPlan,并进一步应用针对 PhysicalPlan 的优化规则。例如上述优化后的 LogicalPlan 会被转换成如下 PhysicalPlan:
* TopN: Sort by = (col(0: id), descending, nulls first), Num Rows = 10, Offset | = 100 | Stats = { Approx num rows = 110, Approx size bytes = 3.01 KiB, Accumulated | selectivity = 0.01 } | * ScanTaskSource: | Num Scan Tasks = 112 | Estimated Scan Bytes = 180568 | Pushdowns: {projection: [id, name], filter: col(id) < lit(127)} | Schema: {id#Int64, name#Utf8, email#Utf8} | Scan Tasks: [ ... ] | Stats = { Approx num rows = 3,279, Approx size bytes = 92.09 KiB, Accumulated | selectivity = 0.30 }
目前在 DATA + AI 领域主流的数据计算引擎(例如 Pandas、Polars)都默认仅提供单机执行模式,其中 Polars 的分布式版本仅在商业版提供,而 Dask 虽然可以看作是 Pandas 的分布式版本,但执行效率较低。Daft 在开源版本中同时提供了单机和分布式两种运行模式,并且允许在这两种模式之间任意切换。
Daft 默认以单机模式执行用户输入的 SQL 或构造的 DataFrame,其单机执行引擎 Swordfish 采用 Rust 语言编写,该引擎基于 Pipeline 计算模型设计,借助多线程实现对数据的并行计算。
如上图所示,左边的 PhysicalPlan 会被 Swordfish 引擎解析成一棵 Pipeline 树,引擎会以深度优先的方式对数树进行遍历,并逐一启动运行各个节点。Swordfish PipelineNode 包含 4 类节点:
节点类型 | 定义 | 并发数 | Operator 示例 |
|---|---|---|---|
SourceNode | 数据源节点 | 依赖数据分片数,默认上限为 8 | ScanTaskSource |
IntermediateNode | 中间处理节点 | 取决于具体的 Operator 类型,默认为所在节点的 CPU 核数 | Project、Filter、UDF、JoinProbe |
BlockingSinkNode | 阻塞数据接收节点 | 取决于具体的 Operator 类型,默认为所在节点的 CPU 核数 | Aggregate、JoinBuild、TopN、Sort |
StreamingSinkNode | 流式数据接收节点 | 取决于具体的 Operator 类型,默认为所在节点的 CPU 核数 | Limit、MonotonicallyIncreasingId |
节点间的数据传递依赖 Kanal Channel 实现。
Daft 不仅支持以单机模式运行,还支持基于 Ray 以分布式模式运行。用户只需要简单配置 Ray 集群的连接信息即可实现在单机模式和分布式模式之间切换,无需侵入到业务逻辑层面进行适配。
Flotilla v1.5 版本是 Daft 最新的分布式执行引擎,其设计目标是为多模态数据处理提供更好的可扩展性、性能和可观察性,支持:
在设计理念上,Flotilla v1.5 版本通过 Ray 将多个 Swordfish 单机执行引擎分布式化,不同节点之间借助内置的 Flight Shuffle Server 实现数据 Shuffle 传递。
Flotilla 分布式执行引擎在实现层面包含如下几个核心组件:
每个 Ray 的 Worker 节点上会常驻运行一个 RaySwordfishActor,负责接收 SwordfishTask 内部包含的可以在单机 Swordfish 引擎上执行的 LocalPhysicalPlan。当接收到执行请求时,RaySwordfishActor 会将 LocalPhysicalPlan 提交给本地的 Swordfish 单机引擎执行,具体执行流程可以参考上 1 小节的介绍。
Flotilla 分布式执行引擎目前已经演化到 v1.5 版本,性能相对于 v1.0 版本有了显著提升,但功能完备度相对一般,因此当遇到不支持的操作时会 fallback 到 v1.0 版本以保证 job 能够正常运行。区别于 v1.5 版本采用 Actor 模式配合 Swordfish 单机执行引擎执行 Pipeline 算子,v1.0 版本则将每个 Stage 切分成多个 Ray Remote Task 批量提交给 Ray 集群执行。
如上图左图所示,v1.0 版本对于 PhysicalPlan 核心执行流程:
Flotilla 分布式执行引擎之所以要重新设计,主要是考虑 v1.0 相对于 v1.5 存在如下 2 个核心问题: