Spark读取S3压缩CSV在EMR上OOM异常求助:小数据集反而失败
为什么会出现这种矛盾现象?
错误码137是YARN容器被OOM Killer强制终止的典型标志,本质是Executor内存不足,但你的场景非常特殊——更大的数据集能正常运行、拆分数据集也能成功、本地环境无压力,这说明问题不在总数据量,而是数据分布或Spark分区处理逻辑导致的单Executor负载突增。
1. 小文件爆炸引发的分区倾斜
你的小数据集包含大量1-20KB的文件(总数据行数3.7亿),Spark默认会为每个小文件创建独立分区(S3上小文件不会被合并成块)。当合并2017和2018的数据时,分区数刚好触发了某个临界值:
- 70GB的大数据集虽然总量大,但文件大小更均匀,分区负载能被Executor均衡分摊;
- 拆分跑时分区数减半,每个Executor的压力降低,不会触发内存阈值;
- 本地模式下Spark采用单JVM多线程架构,内存管理没有YARN容器的严格隔离限制,16GB内存足以扛过临时峰值。
2. 最后阶段的元数据/合并压力
从stderr的Stage 0进度来看,任务在完成9999/10000时失败,说明问题出在收尾阶段:
- Spark在完成所有分区读取后,需要做全局元数据汇总(比如schema校验、文件信息统计),大量小文件的元数据会占用额外内存;
- 可能存在个别文件解压后数据量异常大(压缩后小但实际数据多),刚好被分配到已经内存紧张的Executor,直接触发OOM。
3. EMR内存配置未适配小文件场景
你的集群使用m4.xlarge实例(16GB RAM),EMR默认的Spark内存配置可能未针对大量小文件优化:
- YARN容器的内存限制(包括堆外内存)设置不足,导致Executor处理小文件元数据时超出阈值;
- Driver内存如果不足,也会在收集所有文件元数据时出现问题(不过你的错误显示是Executor层面的OOM,但Driver压力也可能间接影响)。
4. Executor异常的连锁反应
你看到的NoSuchElementException: None.get是Spark UI获取Executor线程dump时的错误,这是Executor被OOM杀掉后的结果,而非原因,但它证实了有Executor异常退出。
具体解决办法
1. 合并小文件,从根源减少分区数
大量小文件是Spark的性能天敌,尤其在S3环境下。可以通过两种方式处理:
- 先用S3 DistCp或Spark预合并小文件:将
s3://bucket/data/下的小文件合并为128MB左右的大文件(对应HDFS块大小); - 读取时调整参数,让Spark自动合并小文件:
// 设置单个分区的未压缩字节数,根据gzip压缩比(约10:1)调整,这里对应压缩后128MB spark.conf.set("spark.sql.files.maxPartitionBytes", "1342177280")
2. 优化EMR Spark内存配置
修改你的CONFIG_FILE中的Spark配置,适配m4.xlarge实例:
[ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "10g", "spark.executor.memoryOverhead": "2g", "spark.driver.memory": "8g", "spark.driver.memoryOverhead": "2g" } } ]
m4.xlarge有16GB RAM,预留足够的堆外内存(memoryOverhead)用于处理元数据、NIO缓冲区等非堆内存开销。
3. 关闭不必要的统计信息收集
Spark默认会收集表的统计信息,大量小文件会加剧内存消耗,可临时关闭:
spark.conf.set("spark.sql.statistics.histogram.enabled", "false") spark.conf.set("spark.sql.statistics.size.autoUpdate.enabled", "false")
4. 排查异常大文件
虽然你说文件大小在1-20KB,但可能存在个别异常文件。用AWS CLI检查:
aws s3 ls s3://bucket/data/ --recursive | sort -k3 -n | tail -20
如果发现异常大的文件,单独处理(比如拆分或剔除)。
5. 强制重分区均衡负载
读取数据后立即执行repartition,重新分配分区避免倾斜:
spark.read.option("delimiter", delimiter) .schema(Encoders.product[MyData].schema) .csv("s3://bucket/data/*/*.gz") .as[MyData] .repartition(200) // 根据集群核心数调整,建议每个Executor处理2-4个任务
此操作会触发Shuffle,但能彻底解决分区负载不均衡问题。
总结
核心问题是大量小文件导致的分区负载不均衡,叠加YARN容器的严格内存限制,使得最后阶段某个Executor内存溢出。更大的数据集因文件分布更合理未触发该问题,本地模式因内存管理更灵活能扛过峰值。通过合并小文件、调整内存配置、强制重分区即可解决问题。
内容的提问来源于stack exchange,提问作者Eric




