You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

Scala Spark:移除copyMerge生成TSV文件中空格与null的双引号

解决FileUtil.copyMerge合并后空格/Null被双引号包裹的问题

我之前也碰到过一模一样的情况,核心原因是FileUtil.copyMerge的默认输出逻辑和你Spark写入分文件时的配置不匹配:分文件是按照你设置的无引号规则生成的,但合并时工具默认把空格、null这类特殊值当成需要转义的字段,自动加上了双引号。下面给你几个靠谱的解决办法,按推荐程度排序:

方法1:用Spark原生方式生成单文件(最推荐)

其实完全不需要事后合并,Spark本身支持直接生成单个文件,还能精确控制输出格式,从根源避免问题。

比如你可以先把DataFrame合并为一个分区,然后写入时明确禁用引号、指定null值的显示方式:

// Scala 示例代码
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SaveMode

// 先将DataFrame合并为1个分区,写入临时目录
df.coalesce(1)
  .write
  .option("header", "false") // 根据你的需求决定是否保留表头
  .option("quote", "") // 关键:禁用所有引号
  .option("nullValue", "") // 把null转为空字符串,也可以设为你需要的其他值
  .option("escape", "") // 禁用转义字符,避免额外的格式问题
  .mode(SaveMode.Overwrite)
  .csv("/tmp/temp_single_file") // 临时目录

// 接下来把生成的part文件移动到目标路径
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// 匹配临时目录下的part文件(Spark生成的文件名类似part-00000-xxxx.csv)
val srcPartPath = fs.globStatus(new Path("/tmp/temp_single_file/part-00000*"))(0).getPath
val dstPath = new Path("/destination.txt")

// 移动并覆盖目标文件
if (fs.exists(dstPath)) fs.delete(dstPath, false)
fs.rename(srcPartPath, dstPath)

// 删除临时目录
fs.delete(new Path("/tmp/temp_single_file"), true)

为什么推荐这个?因为Spark的写入配置完全由你掌控,不会出现合并工具“画蛇添足”加引号的情况,而且性能也更稳定。

方法2:修改FileUtil.copyMerge的配置(若必须用合并工具)

如果你因为某些原因一定要用FileUtil.copyMerge,那可以通过Hadoop配置来禁用引号逻辑,让合并时的输出规则和分文件保持一致:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration

val conf = spark.sparkContext.hadoopConfiguration
// 根据你的文件格式设置对应的配置:
// 如果是CSV格式
conf.set("mapreduce.csv.quote", "")
conf.set("mapreduce.csv.nullValue", "")
// 如果是纯文本格式,确保分隔符正确,同时禁用引号
conf.set("mapreduce.output.textoutputformat.separator", ",") // 替换成你的字段分隔符

val srcDir = new Path("/your_part_files_directory") // 分文件所在的目录
val dstFile = new Path("/destination.txt")
val fs = FileSystem.get(conf)

// 调用copyMerge时传入配置好的conf
org.apache.hadoop.fs.FileUtil.copyMerge(
  fs,
  srcDir,
  fs,
  dstFile,
  false, // 是否删除原目录
  conf,
  null // 自定义输出内容的话可以传这里,一般用null即可
)

注意:不同版本的Hadoop/Spark,配置参数的前缀可能略有不同,比如有的版本用spark.sql.csv.quote,如果上面的参数不生效,可以查一下你对应版本的官方文档调整。

方法3:合并后用HDFS命令批量移除引号(应急方案)

如果上面两种方法都来不及调整,你可以用HDFS的命令行工具快速清理双引号——但这个方法只适合你的数据中没有需要保留的双引号的情况:

# 读取原文件,移除所有双引号,写入临时文件
hdfs dfs -cat /destination.txt | sed 's/"//g' | hdfs dfs -put - /destination_cleaned.txt

# 替换原文件
hdfs dfs -mv /destination_cleaned.txt /destination.txt

这个方法简单粗暴,但如果你的业务数据本身包含合法的双引号(比如字符串字段里的引号),就会误删,所以只适合应急。


内容的提问来源于stack exchange,提问作者Sat

火山引擎 最新活动