You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark读取S3压缩CSV在EMR上OOM异常求助:小数据集反而失败

问题分析与解决方案

为什么会出现这种矛盾现象?

错误码137是YARN容器被OOM Killer强制终止的典型标志,本质是Executor内存不足,但你的场景非常特殊——更大的数据集能正常运行、拆分数据集也能成功、本地环境无压力,这说明问题不在总数据量,而是数据分布或Spark分区处理逻辑导致的单Executor负载突增

1. 小文件爆炸引发的分区倾斜

你的小数据集包含大量1-20KB的文件(总数据行数3.7亿),Spark默认会为每个小文件创建独立分区(S3上小文件不会被合并成块)。当合并2017和2018的数据时,分区数刚好触发了某个临界值:

  • 70GB的大数据集虽然总量大,但文件大小更均匀,分区负载能被Executor均衡分摊;
  • 拆分跑时分区数减半,每个Executor的压力降低,不会触发内存阈值;
  • 本地模式下Spark采用单JVM多线程架构,内存管理没有YARN容器的严格隔离限制,16GB内存足以扛过临时峰值。

2. 最后阶段的元数据/合并压力

从stderr的Stage 0进度来看,任务在完成9999/10000时失败,说明问题出在收尾阶段:

  • Spark在完成所有分区读取后,需要做全局元数据汇总(比如schema校验、文件信息统计),大量小文件的元数据会占用额外内存;
  • 可能存在个别文件解压后数据量异常大(压缩后小但实际数据多),刚好被分配到已经内存紧张的Executor,直接触发OOM。

3. EMR内存配置未适配小文件场景

你的集群使用m4.xlarge实例(16GB RAM),EMR默认的Spark内存配置可能未针对大量小文件优化:

  • YARN容器的内存限制(包括堆外内存)设置不足,导致Executor处理小文件元数据时超出阈值;
  • Driver内存如果不足,也会在收集所有文件元数据时出现问题(不过你的错误显示是Executor层面的OOM,但Driver压力也可能间接影响)。

4. Executor异常的连锁反应

你看到的NoSuchElementException: None.get是Spark UI获取Executor线程dump时的错误,这是Executor被OOM杀掉后的结果,而非原因,但它证实了有Executor异常退出。


具体解决办法

1. 合并小文件,从根源减少分区数

大量小文件是Spark的性能天敌,尤其在S3环境下。可以通过两种方式处理:

  • 先用S3 DistCp或Spark预合并小文件:将s3://bucket/data/下的小文件合并为128MB左右的大文件(对应HDFS块大小);
  • 读取时调整参数,让Spark自动合并小文件:
    // 设置单个分区的未压缩字节数,根据gzip压缩比(约10:1)调整,这里对应压缩后128MB
    spark.conf.set("spark.sql.files.maxPartitionBytes", "1342177280")
    

2. 优化EMR Spark内存配置

修改你的CONFIG_FILE中的Spark配置,适配m4.xlarge实例:

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.executor.memory": "10g",
      "spark.executor.memoryOverhead": "2g",
      "spark.driver.memory": "8g",
      "spark.driver.memoryOverhead": "2g"
    }
  }
]

m4.xlarge有16GB RAM,预留足够的堆外内存(memoryOverhead)用于处理元数据、NIO缓冲区等非堆内存开销。

3. 关闭不必要的统计信息收集

Spark默认会收集表的统计信息,大量小文件会加剧内存消耗,可临时关闭:

spark.conf.set("spark.sql.statistics.histogram.enabled", "false")
spark.conf.set("spark.sql.statistics.size.autoUpdate.enabled", "false")

4. 排查异常大文件

虽然你说文件大小在1-20KB,但可能存在个别异常文件。用AWS CLI检查:

aws s3 ls s3://bucket/data/ --recursive | sort -k3 -n | tail -20

如果发现异常大的文件,单独处理(比如拆分或剔除)。

5. 强制重分区均衡负载

读取数据后立即执行repartition,重新分配分区避免倾斜:

spark.read.option("delimiter", delimiter)
 .schema(Encoders.product[MyData].schema)
 .csv("s3://bucket/data/*/*.gz")
 .as[MyData]
 .repartition(200) // 根据集群核心数调整,建议每个Executor处理2-4个任务

此操作会触发Shuffle,但能彻底解决分区负载不均衡问题。


总结

核心问题是大量小文件导致的分区负载不均衡,叠加YARN容器的严格内存限制,使得最后阶段某个Executor内存溢出。更大的数据集因文件分布更合理未触发该问题,本地模式因内存管理更灵活能扛过峰值。通过合并小文件、调整内存配置、强制重分区即可解决问题。

内容的提问来源于stack exchange,提问作者Eric

火山引擎 最新活动