You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

AWS EMR环境下处理数十亿级Parquet分区数据时PySpark任务因磁盘空间不足失败的解决咨询

问题描述

我正在尝试用Spark SQL的两个WITH子句读取Hive表的两个分区,通过左外连接(left outer join)获取增量数据。这两个分区规模相当大:各包含270亿条记录,单分区数据量达900GB,每个分区由10个90GB的Snappy压缩Parquet文件组成。

我在拥有28个节点的AWS EMR r4.16xlarge集群上运行PySpark任务,已经试过多种Spark配置,但每次任务都以失败告终,报错信息如下:

Job aborted due to stage failure: most recent failure: Lost task java.io.IOException: No space left on device

我推测问题出在Worker节点的临时磁盘空间不足,尝试设置spark.sql.shuffle.partitions=3000后仍未解决,希望能得到可行的解决思路。

已尝试的Spark配置
  • 尝试1:
    --executor-cores 5 --num-executors 335 --executor-memory 37G --driver-memory 366G
    
  • 尝试2:
    --driver-memory 200G --deploy-mode client --executor-memory 40G --executor-cores 4 
    --conf spark.dynamicAllocation.enabled=true 
    --conf spark.shuffle.service.enabled=true 
    --conf spark.executor.memoryOverhead=30g 
    --conf spark.rpc.message.maxSize=1024 
    --conf spark.sql.shuffle.partitions=3000 
    --conf spark.sql.autoBroadcastJoinThreshold=-1 
    --conf spark.driver.maxResultSize=4G 
    --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
    
  • 尝试3:
    --driver-memory 200G --deploy-mode client --executor-memory 100G --executor-cores 4 
    --conf spark.dynamicAllocation.enabled=true 
    --conf spark.shuffle.service.enabled=true
    
可行解决思路

给你几个针对性的方向,你可以逐一尝试:

1. 优化临时磁盘空间利用

  • AWS EMR的r4.16xlarge实例自带的临时磁盘(/mnt)空间有限,你可以检查是否挂载了额外的EBS卷来扩展临时空间。同时,通过配置spark.local.dir指定多个磁盘路径(用逗号分隔),让Spark分散存储临时文件,避免单盘被占满。
  • 确认spark.shuffle.spill.compressspark.shuffle.compress都设置为true(默认开启,但建议手动指定),对Shuffle溢出和输出的数据进行压缩,大幅减少磁盘占用量。
  • 定期清理Spark的临时目录,或者在任务结束后自动清理,避免旧数据占用空间。

2. 调整Shuffle相关配置

  • 你当前设置的3000个Shuffle分区可能不够合理。建议根据数据量调整:目标是让每个Shuffle分区的大小控制在100-200MB左右。按900GB的数据量计算,spark.sql.shuffle.partitions可以设置为4608(900*1024/200),这样每个分区的大小更适中,减少单个Task的磁盘压力。
  • 启用spark.shuffle.sort.bypassMergeThreshold并设置为200左右,当Shuffle分区数小于这个阈值时,Spark会跳过排序直接合并文件,减少磁盘IO和空间占用。
  • 调整spark.executor.memoryOverhead:r4.16xlarge有256G内存,若executor内存设为40G,overhead建议设为8-10G即可(不需要30G这么大),留足够内存给JVM和系统进程,减少因内存不足导致的频繁磁盘spill。

3. 优化数据处理流程

  • 先过滤再Join:在WITH子句里先对两个分区的数据进行字段裁剪和条件过滤,只保留Join需要的字段和增量相关的数据,减少后续处理的数据量。
  • 合并大文件:每个分区的10个90GB文件太大,会导致单个Task处理的数据量过载。可以先运行一个预处理任务,把每个分区的大文件合并成多个10-20GB的Parquet文件,再执行Join操作,提升Task的并行度和稳定性。
  • 分阶段处理:如果左表是全量数据、右表是增量数据,可以先对右表按Join键进行预聚合,再和左表Join;或者将左表按Join键分区存储,减少Shuffle的数据传输量。

4. 集群资源与部署调优

  • 改用Cluster部署模式:你之前用的是client模式,driver运行在本地,不仅占用本地资源,还可能出现数据传输瓶颈。改用cluster模式让driver运行在集群节点上,资源利用更均衡,也能避免本地磁盘的限制。
  • 合理规划Executor数量:r4.16xlarge每个节点有64vCPU,建议每个executor分配4核,每个节点预留2核给系统进程,这样每个节点可以运行15个executor((64-2)/4=15)。28个节点总共可以运行420个executor,内存方面每个executor分配14G,overhead设为2G,刚好控制在节点256G内存的范围内(15*(14+2)=240G,剩余16G给系统)。

5. 监控与排查

  • 任务运行时,通过EMR的YARN管理页面查看每个Worker节点的磁盘使用情况,确认是否是某个节点的临时磁盘被Shuffle数据占满。也可以登录Worker节点执行df -h命令,查看/mnt或你指定的临时目录的空间占用详情。
  • 开启Spark的详细日志模式,查看失败Task的具体堆栈信息,定位是Shuffle阶段、数据读取阶段还是其他环节导致的磁盘空间不足。

内容的提问来源于stack exchange,提问作者user7343922

火山引擎 最新活动