AWS EMR环境下处理数十亿级Parquet分区数据时PySpark任务因磁盘空间不足失败的解决咨询
问题描述
我正在尝试用Spark SQL的两个WITH子句读取Hive表的两个分区,通过左外连接(left outer join)获取增量数据。这两个分区规模相当大:各包含270亿条记录,单分区数据量达900GB,每个分区由10个90GB的Snappy压缩Parquet文件组成。
我在拥有28个节点的AWS EMR r4.16xlarge集群上运行PySpark任务,已经试过多种Spark配置,但每次任务都以失败告终,报错信息如下:
Job aborted due to stage failure: most recent failure: Lost task java.io.IOException: No space left on device
我推测问题出在Worker节点的临时磁盘空间不足,尝试设置spark.sql.shuffle.partitions=3000后仍未解决,希望能得到可行的解决思路。
已尝试的Spark配置
- 尝试1:
--executor-cores 5 --num-executors 335 --executor-memory 37G --driver-memory 366G - 尝试2:
--driver-memory 200G --deploy-mode client --executor-memory 40G --executor-cores 4 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.executor.memoryOverhead=30g --conf spark.rpc.message.maxSize=1024 --conf spark.sql.shuffle.partitions=3000 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.driver.maxResultSize=4G --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 - 尝试3:
--driver-memory 200G --deploy-mode client --executor-memory 100G --executor-cores 4 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true
可行解决思路
给你几个针对性的方向,你可以逐一尝试:
1. 优化临时磁盘空间利用
- AWS EMR的r4.16xlarge实例自带的临时磁盘(
/mnt)空间有限,你可以检查是否挂载了额外的EBS卷来扩展临时空间。同时,通过配置spark.local.dir指定多个磁盘路径(用逗号分隔),让Spark分散存储临时文件,避免单盘被占满。 - 确认
spark.shuffle.spill.compress和spark.shuffle.compress都设置为true(默认开启,但建议手动指定),对Shuffle溢出和输出的数据进行压缩,大幅减少磁盘占用量。 - 定期清理Spark的临时目录,或者在任务结束后自动清理,避免旧数据占用空间。
2. 调整Shuffle相关配置
- 你当前设置的3000个Shuffle分区可能不够合理。建议根据数据量调整:目标是让每个Shuffle分区的大小控制在100-200MB左右。按900GB的数据量计算,
spark.sql.shuffle.partitions可以设置为4608(900*1024/200),这样每个分区的大小更适中,减少单个Task的磁盘压力。 - 启用
spark.shuffle.sort.bypassMergeThreshold并设置为200左右,当Shuffle分区数小于这个阈值时,Spark会跳过排序直接合并文件,减少磁盘IO和空间占用。 - 调整
spark.executor.memoryOverhead:r4.16xlarge有256G内存,若executor内存设为40G,overhead建议设为8-10G即可(不需要30G这么大),留足够内存给JVM和系统进程,减少因内存不足导致的频繁磁盘spill。
3. 优化数据处理流程
- 先过滤再Join:在WITH子句里先对两个分区的数据进行字段裁剪和条件过滤,只保留Join需要的字段和增量相关的数据,减少后续处理的数据量。
- 合并大文件:每个分区的10个90GB文件太大,会导致单个Task处理的数据量过载。可以先运行一个预处理任务,把每个分区的大文件合并成多个10-20GB的Parquet文件,再执行Join操作,提升Task的并行度和稳定性。
- 分阶段处理:如果左表是全量数据、右表是增量数据,可以先对右表按Join键进行预聚合,再和左表Join;或者将左表按Join键分区存储,减少Shuffle的数据传输量。
4. 集群资源与部署调优
- 改用Cluster部署模式:你之前用的是client模式,driver运行在本地,不仅占用本地资源,还可能出现数据传输瓶颈。改用cluster模式让driver运行在集群节点上,资源利用更均衡,也能避免本地磁盘的限制。
- 合理规划Executor数量:r4.16xlarge每个节点有64vCPU,建议每个executor分配4核,每个节点预留2核给系统进程,这样每个节点可以运行15个executor((64-2)/4=15)。28个节点总共可以运行420个executor,内存方面每个executor分配14G,overhead设为2G,刚好控制在节点256G内存的范围内(15*(14+2)=240G,剩余16G给系统)。
5. 监控与排查
- 任务运行时,通过EMR的YARN管理页面查看每个Worker节点的磁盘使用情况,确认是否是某个节点的临时磁盘被Shuffle数据占满。也可以登录Worker节点执行
df -h命令,查看/mnt或你指定的临时目录的空间占用详情。 - 开启Spark的详细日志模式,查看失败Task的具体堆栈信息,定位是Shuffle阶段、数据读取阶段还是其他环节导致的磁盘空间不足。
内容的提问来源于stack exchange,提问作者user7343922




