Spark单Action的DAG中是否复用RDD?两类场景疑问及优化咨询
Spark SQL 查询执行次数问题解答
Case 1:两次独立调用spark.sql的场景
先看你的第一段代码:
val df1 = spark.sql("select id, value from table") val df2 = spark.sql("select id, value from table") df1.join(df2, "id").show()
关于这个场景下查询是否仅执行一次:
- 默认情况下,Spark SQL的优化器会启用公共子表达式消除(CommonSubexpressionElimination,这是默认开启的优化规则),它能识别到两次完全相同的表扫描操作,生成物理执行计划时会把它们合并成一次。也就是说,最终只会对
table执行一次扫描,再将结果分别提供给df1和df2完成join操作。 - 不过这个优化有个前提:你的
table是支持这类优化的常规数据源(比如Parquet、ORC、Hive表等)。如果是JDBC连接的关系型数据库这类特殊数据源,可能受限于数据源特性无法合并扫描,这时候就会执行两次查询。
Case 2:基于已有DataFrame做转换的场景
再看第二段代码:
val df1 = spark.sql("select id, value from table") val df2 = df1.filter($"value" > 0) df1.join(df2, "id").show()
执行次数问题
这个场景下,查询只会执行一次。因为df2是基于df1的转换操作(filter),整个数据处理的 lineage 是连贯的:Spark会先扫描一次table生成df1对应的数据集,接着在这个数据集上执行filter得到df2的数据集,最后完成join操作。整个Job的物理计划只会包含一次对table的扫描,不会重复执行原始查询。
确保单次执行的优化方案
如果遇到特殊情况(比如复杂lineage导致优化器未合并扫描,或数据源特性限制),想要稳妥保证仅执行一次查询,最直接的方式是显式缓存df1的结果:
val df1 = spark.sql("select id, value from table").cache() // 也可用persist指定存储级别,比如MEMORY_AND_DISK val df2 = df1.filter($"value" > 0) df1.join(df2, "id").show() // 后续不需要缓存时可释放资源 df1.unpersist()
使用cache()或persist()会把df1的结果存储在内存或磁盘中,后续基于df1的所有转换操作(比如df2的filter、join)都会直接调用缓存数据,不会重新扫描原始表。
内容的提问来源于stack exchange,提问作者Frank Cheng




