You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark单Action的DAG中是否复用RDD?两类场景疑问及优化咨询

Spark SQL 查询执行次数问题解答

Case 1:两次独立调用spark.sql的场景

先看你的第一段代码:

val df1 = spark.sql("select id, value from table")
val df2 = spark.sql("select id, value from table")
df1.join(df2, "id").show()

关于这个场景下查询是否仅执行一次:

  • 默认情况下,Spark SQL的优化器会启用公共子表达式消除(CommonSubexpressionElimination,这是默认开启的优化规则),它能识别到两次完全相同的表扫描操作,生成物理执行计划时会把它们合并成一次。也就是说,最终只会对table执行一次扫描,再将结果分别提供给df1和df2完成join操作。
  • 不过这个优化有个前提:你的table是支持这类优化的常规数据源(比如Parquet、ORC、Hive表等)。如果是JDBC连接的关系型数据库这类特殊数据源,可能受限于数据源特性无法合并扫描,这时候就会执行两次查询。

Case 2:基于已有DataFrame做转换的场景

再看第二段代码:

val df1 = spark.sql("select id, value from table")
val df2 = df1.filter($"value" > 0)
df1.join(df2, "id").show()

执行次数问题

这个场景下,查询只会执行一次。因为df2是基于df1的转换操作(filter),整个数据处理的 lineage 是连贯的:Spark会先扫描一次table生成df1对应的数据集,接着在这个数据集上执行filter得到df2的数据集,最后完成join操作。整个Job的物理计划只会包含一次对table的扫描,不会重复执行原始查询。

确保单次执行的优化方案

如果遇到特殊情况(比如复杂lineage导致优化器未合并扫描,或数据源特性限制),想要稳妥保证仅执行一次查询,最直接的方式是显式缓存df1的结果:

val df1 = spark.sql("select id, value from table").cache() // 也可用persist指定存储级别,比如MEMORY_AND_DISK
val df2 = df1.filter($"value" > 0)
df1.join(df2, "id").show()
// 后续不需要缓存时可释放资源
df1.unpersist()

使用cache()persist()会把df1的结果存储在内存或磁盘中,后续基于df1的所有转换操作(比如df2的filter、join)都会直接调用缓存数据,不会重新扫描原始表。


内容的提问来源于stack exchange,提问作者Frank Cheng

火山引擎 最新活动