Spark作业自动删除HDFS目标文件夹并报空类型错误排查
问题分析与解决方案
是的,你的脚本确实会导致目标文件夹被删除,而且这个现象和你遇到的UnsupportedOperationException错误直接相关,下面具体拆解原因和解决办法:
一、为什么文件夹会被自动删除?
核心原因是你使用了SaveMode.Overwrite模式:
- Spark执行
save()方法时,第一步就会删除目标目录,不管后续写入数据是否成功。 - 当写入过程中因为CSV不支持Null类型抛出异常时,目标目录已经被提前删除了,这就出现了你看到的“文件夹被删+报错”的情况。
二、CSV写入失败的根本原因
你的DataFrame sigma中存在NullType类型的列,而Spark的CSV数据源默认不支持这种数据类型,所以写入时直接抛出java.lang.UnsupportedOperationException: CSV data source does not support null data type异常,中断了写入流程。
三、完整解决方案
1. 先处理NullType列
你需要先清理或转换DataFrame中的NullType列,二选一即可:
方案A:删除无业务价值的NullType列
如果这些列不需要保留,直接删除:
// 先找出所有NullType的列名 val nullTypeCols = sigma.schema.fields.filter(_.dataType == NullType).map(_.name) // 删除这些列 val cleanedSigma = sigma.drop(nullTypeCols: _*)
方案B:转换NullType列为字符串类型
如果需要保留列,将其转为StringType并替换null值为占位符(比如空字符串):
import org.apache.spark.sql.types.StringType import org.apache.spark.sql.functions.col val cleanedSigma = sigma.schema.fields.foldLeft(sigma) { (df, field) => if (field.dataType == NullType) { df.withColumn(field.name, col(field.name).cast(StringType).otherwise("")) } else { df } }
2. 改用临时目录写入,避免目标目录提前被删
为了防止写入失败时目标目录被删除,建议先写入临时目录,确认写入成功后再移动到目标目录:
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SaveMode // 定义临时目录路径 val tempTargetPath = s"${Parametre_vigiliste.cible}_temp" val finalTargetPath = Parametre_vigiliste.cible // 写入临时目录 cleanedSigma.repartition(1) .write .mode(SaveMode.Overwrite) .format("csv") .option("header", true) .option("delimiter", "|") .save(tempTargetPath) // 初始化文件系统 val conf = new Configuration() val fs = FileSystem.get(conf) // 先删除原目标目录(如果存在) if (fs.exists(new Path(finalTargetPath))) { fs.delete(new Path(finalTargetPath), true) } // 将临时目录移动到目标目录 fs.rename(new Path(tempTargetPath), new Path(finalTargetPath)) // 重命名part文件(增加异常判断,避免数组越界) val partFiles = fs.globStatus(new Path(s"$finalTargetPath/part*")) if (partFiles != null && partFiles.length > 0) { val partFileName = partFiles(0).getPath.getName fs.rename( new Path(s"$finalTargetPath/$partFileName"), new Path(s"$finalTargetPath/FIC_PER_DATALAKE_.txt") ) } else { throw new RuntimeException("写入CSV后未找到part文件,请检查写入流程") } // 释放缓存 cleanedSigma.unpersist()
3. 额外注意事项
- 确保HDFS权限足够:你的执行用户需要有目标目录和临时目录的读写权限。
- 避免单点失败:如果是生产环境,建议增加日志记录和异常捕获,方便排查问题。
内容的提问来源于stack exchange,提问作者Haha




