PySpark foreachPartition 未执行问题求助
看起来你遇到了PySpark中foreachPartition代码块不执行的典型问题,结合你在Glue Job中的场景,我帮你梳理几个可能的原因和对应的排查方向:
1. 日志输出不在Driver端,你没看到执行记录
Spark的foreachPartition是在Executor节点上执行的,你在代码里用的print()和logger.info()输出的日志,默认不会出现在Driver的日志里(也就是你当前看到的Job主日志)。Glue Job的日志分为Driver日志和Executor日志,你需要去Glue控制台的Job运行详情里,切换到Executor日志标签页查看,大概率能找到Processing partition这类输出,或者报错信息。
2. 分区处理代码中的异常被静默捕获
你的process_partition函数里有一个try-except块,捕获异常后只做了print(f"Error inserting partition: {e}"),但这个输出在Executor端,Driver日志看不到,导致你误以为代码没执行。建议你:
- 把异常信息用更可靠的方式上报,比如使用Spark的
Accumulator来收集错误信息,或者在except块里调用能把日志同步到Driver的工具; - 先注释掉数据库插入的代码,只保留
print("Processing partition")和打印分区长度的逻辑,测试foreachPartition是否真的执行。
3. Postgres客户端初始化失败(Executor环境问题)
你在process_partition里初始化PostgresDbClient,但Glue的Executor节点可能缺少必要的依赖或者配置:
- 比如SSL证书文件
ssl_cert_file=None是否真的符合Postgres的要求?如果Postgres要求SSL,而Executor没有证书,会导致客户端初始化失败,但被try-except捕获; - 检查你的依赖包是否包含了Postgres的驱动,并且在Glue Job中正确上传了这些依赖(比如通过Glue的Job参数或者S3路径引用)。
4. 分区数计算或重分区的问题
你计算num_partitions = max(1, total_count // 100000),如果total_count小于100000,分区数是1,但重分区后的processed_df可能没有被正确触发执行?不过你之前已经调用了df.count()(这是一个Action操作,会触发前面的计算),所以这个可能性相对较小,但可以尝试直接用原df调用foreachPartition,跳过repartition步骤测试。
快速测试建议
先修改process_partition函数,简化逻辑,去掉数据库操作,只保留基础打印:
def process_partition(partition): partition_list = list(partition) print(f"Processing partition, number of rows: {len(partition_list)}")
然后重新运行Job,去Executor日志里查看是否有这个输出。如果有,说明foreachPartition是正常执行的,问题出在数据库操作相关的代码里;如果还是没有,再检查Spark的执行计划或者Glue的Job配置。
备注:内容来源于stack exchange,提问作者ekm0d




