使用spark_apply时Spark出现GC overhead limit exceeded错误求助
解决Spark spark_apply GC Overhead Limit Exceeded的思路
首先,你调Driver内存没用是因为这个问题大概率出在Worker/Executor端,而不是Driver。咱们一步步来排查优化:
调整Executor内存与资源分配
你每个Worker有29G内存、8核,但Spark默认的Executor配置可能没充分利用这些资源,或者分配不合理导致内存浪费/不足。建议:- 设置
spark.executor.memory=6g:每个Executor分配6G堆内存(留足空间给非堆内存和系统进程) - 设置
spark.executor.memoryOverhead=2g:Executor的非堆内存(比如JNI、缓存等)单独分配,避免挤占堆内存 - 调整Executor数量:每个Worker8核,可设置
spark.executor.cores=2,这样每个Worker能跑4个Executor(8/2),总Executor数为4*4=16,让资源更均匀分配,避免单个Executor负载过重
- 设置
重构闭包:从逐行拉取改为批量查询
你现在用get_dates逐行从Cassandra拉数据,这会产生20万次单独的查询,不仅慢,还会创建大量临时对象,给GC带来极大压力。优化方案:- 在
spark_apply中处理整个分区,而不是单一行:spark_apply支持传递分区数据,你可以把一个分区内的所有需要查询的ID收集起来,用Cassandra的批量查询API一次性拉取数据 - 示例伪代码:
spark_apply(df, function(partition_df) { # 收集分区内所有需要查询的ID ids <- partition_df$id_column # 批量查询Cassandra batch_data <- cassandra_batch_query(ids) # 关联原数据并返回 merge(partition_df, batch_data, by = "id_column") })
这种方式能大幅减少网络请求和对象创建,降低GC频率。
- 在
排查数据倾斜
20万行总量不大,但如果某个分区的数据量远大于其他分区,会导致单个Executor内存过载。你可以通过Spark UI的Storage或Executors页面查看分区大小:- 如果存在倾斜,用
spark_repartition()将数据分成更多小分区(比如分成100个分区,对应你16个Executor的话,每个Executor处理6-7个分区) - 或者针对倾斜的键做加盐处理,分散到多个分区
- 如果存在倾斜,用
优化GC参数
默认的Parallel GC在处理大量临时对象时效率不高,建议切换到G1GC,并调整相关参数:conf <- spark_config() conf$spark.executor.extraJavaOptions <- "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=40" sc <- spark_connect(master = "yarn", config = conf)-XX:+UseG1GC:使用G1垃圾收集器,更适合大内存、多线程场景-XX:MaxGCPauseMillis=200:限制GC最大暂停时间,避免长时间停顿-XX:InitiatingHeapOccupancyPercent=40:当堆内存占用达到40%时触发GC,避免内存占用过高导致OOM
升级依赖版本
你用的sparklyr v0.6.2和Spark v2.1.0都是比较老旧的版本(2017年左右),存在不少已知的内存泄漏和性能问题:- 升级sparklyr到1.x以上版本(比如最新的1.8.x),新版本对闭包序列化、内存管理有大幅优化
- 升级Spark到2.4.x(长期支持版)或更高版本,Spark 2.4+对Executor内存管理、GC优化都有改进
检查闭包中的大对象
如果你的get_dates闭包引用了全局的大对象(比如大字典、数据集),这些对象会被序列化到每个Executor,占用大量内存。解决方法:- 用
spark_broadcast()将大对象广播到所有Executor,避免重复序列化和内存占用 - 或者在Executor端本地加载这些对象(比如从本地文件或Cassandra缓存中读取),而不是通过闭包传递
- 用
内容的提问来源于stack exchange,提问作者Zafar




