You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark作业自动删除HDFS目标文件夹并报空类型错误排查

问题分析与解决方案

是的,你的脚本确实会导致目标文件夹被删除,而且这个现象和你遇到的UnsupportedOperationException错误直接相关,下面具体拆解原因和解决办法:

一、为什么文件夹会被自动删除?

核心原因是你使用了SaveMode.Overwrite模式:

  • Spark执行save()方法时,第一步就会删除目标目录,不管后续写入数据是否成功。
  • 当写入过程中因为CSV不支持Null类型抛出异常时,目标目录已经被提前删除了,这就出现了你看到的“文件夹被删+报错”的情况。

二、CSV写入失败的根本原因

你的DataFrame sigma中存在NullType类型的列,而Spark的CSV数据源默认不支持这种数据类型,所以写入时直接抛出java.lang.UnsupportedOperationException: CSV data source does not support null data type异常,中断了写入流程。

三、完整解决方案

1. 先处理NullType列

你需要先清理或转换DataFrame中的NullType列,二选一即可:

方案A:删除无业务价值的NullType列

如果这些列不需要保留,直接删除:

// 先找出所有NullType的列名
val nullTypeCols = sigma.schema.fields.filter(_.dataType == NullType).map(_.name)
// 删除这些列
val cleanedSigma = sigma.drop(nullTypeCols: _*)

方案B:转换NullType列为字符串类型

如果需要保留列,将其转为StringType并替换null值为占位符(比如空字符串):

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions.col

val cleanedSigma = sigma.schema.fields.foldLeft(sigma) { (df, field) =>
  if (field.dataType == NullType) {
    df.withColumn(field.name, col(field.name).cast(StringType).otherwise(""))
  } else {
    df
  }
}

2. 改用临时目录写入,避免目标目录提前被删

为了防止写入失败时目标目录被删除,建议先写入临时目录,确认写入成功后再移动到目标目录:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode

// 定义临时目录路径
val tempTargetPath = s"${Parametre_vigiliste.cible}_temp"
val finalTargetPath = Parametre_vigiliste.cible

// 写入临时目录
cleanedSigma.repartition(1)
  .write
  .mode(SaveMode.Overwrite)
  .format("csv")
  .option("header", true)
  .option("delimiter", "|")
  .save(tempTargetPath)

// 初始化文件系统
val conf = new Configuration()
val fs = FileSystem.get(conf)

// 先删除原目标目录(如果存在)
if (fs.exists(new Path(finalTargetPath))) {
  fs.delete(new Path(finalTargetPath), true)
}

// 将临时目录移动到目标目录
fs.rename(new Path(tempTargetPath), new Path(finalTargetPath))

// 重命名part文件(增加异常判断,避免数组越界)
val partFiles = fs.globStatus(new Path(s"$finalTargetPath/part*"))
if (partFiles != null && partFiles.length > 0) {
  val partFileName = partFiles(0).getPath.getName
  fs.rename(
    new Path(s"$finalTargetPath/$partFileName"),
    new Path(s"$finalTargetPath/FIC_PER_DATALAKE_.txt")
  )
} else {
  throw new RuntimeException("写入CSV后未找到part文件,请检查写入流程")
}

// 释放缓存
cleanedSigma.unpersist()

3. 额外注意事项

  • 确保HDFS权限足够:你的执行用户需要有目标目录和临时目录的读写权限。
  • 避免单点失败:如果是生产环境,建议增加日志记录和异常捕获,方便排查问题。

内容的提问来源于stack exchange,提问作者Haha

火山引擎 最新活动