Spark 2.4.5中使用eqNullSafe进行DataFrame连接返回错误结果(疑似无条件全连接)
问题原因分析及解决方案
这确实是Spark 2.4.5版本中的一个已知bug,也是老版本Spark处理NULL值连接时的典型问题之一。
为什么会出现全连接的结果?
你使用eqNullSafe()的本意是让NULL值与NULL值匹配、非NULL值按相等规则匹配,从而得到2行预期结果,但在Spark 2.4.x的部分版本中,当eqNullSafe()(或等价的<=>操作符)作为join的连接条件时,若列中包含NULL值,连接条件会被错误地忽略。加上你提前开启了spark.sql.crossJoin.enabled = True,Spark就默认执行了交叉连接(Cross Join)——也就是把两个DataFrame的所有行进行笛卡尔积组合,最终得到了4行(2*2)的结果。
这是Spark的Bug吗?
是的,这个问题属于Spark的版本缺陷,在Spark 3.0及以上的版本中已经被修复。你可以尝试升级Spark版本,升级后再运行原代码,就能得到预期的匹配结果。
2.4.5版本下的临时解决方案
如果暂时无法升级Spark,可以通过以下两种方式绕过这个bug:
方案1:手动替换NULL值后连接
用一个不会出现在业务数据中的特殊占位符替换NULL值,基于处理后的列完成连接,最后再去掉占位符列:
# 定义一个唯一的NULL占位符,确保和现有数据无冲突 null_placeholder = "__SPECIAL_NULL_MARKER__" # 处理两个DataFrame的value列 df2_processed = df2.withColumn("value_temp", F.coalesce(F.col("value"), F.lit(null_placeholder))) df3_processed = df3.withColumn("value_temp", F.coalesce(F.col("value"), F.lit(null_placeholder))) # 基于临时列连接,然后删除临时列 result = df2_processed.join(df3_processed, on="value_temp").drop("value_temp") result.show()
方案2:先交叉连接再过滤
利用crossJoin生成笛卡尔积,再用eqNullSafe()作为过滤条件,这种方式能规避join条件失效的问题:
result = df2.crossJoin(df3).where(df2.value.eqNullSafe(df3.value)) result.show()
两种方案都能得到你预期的结果:
+-----+--------+-----+------+ |value|small_id|value|sum_id| +-----+--------+-----+------+ | null| 2| null| 2| | foo| 1| foo| 1| +-----+--------+-----+------+
内容的提问来源于stack exchange,提问作者zac




