如何在Flink内部作业失败时自动终止YARN上的Flink ApplicationMaster?
刚好之前处理过类似的Flink on YARN问题,针对你提到的两个需求,我整理了具体的解决方案,分两部分来说:
一、作业失败时终止Flink ApplicationMaster
要实现作业失败时让AM(ApplicationMaster)终止,核心是让AM感知到作业的失败状态并触发自身退出,主要有两种方式:
1. 通过配置参数直接控制
Flink提供了YARN相关的配置参数,可以直接让AM在作业失败时快速退出:
- 设置
yarn.application-master.failure-handling=FAIL_FAST:这个参数控制AM遇到不可恢复错误时的行为,设为FAIL_FAST后,AM会直接退出而不是尝试恢复,YARN会将整个应用标记为失败。 - 配合关闭作业重启策略:设置
restart-strategy.type=none,确保作业失败后不会自动重启,让AM能及时感知到失败状态。
你可以在提交应用时通过命令行参数指定:
flink run -t yarn-application \ -yD yarn.application-master.failure-handling=FAIL_FAST \ -yD restart-strategy.type=none \ your-app.jar
2. 代码层面添加作业失败监听器
如果需要更灵活的控制(比如多作业场景下监控任意一个作业失败),可以在代码中添加作业状态监听器,当作业失败时主动终止AM进程:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 关闭重启策略 env.setRestartStrategy(RestartStrategies.noRestart()); // 提交作业并监听执行结果 JobClient jobClient = env.executeAsync("Your Streaming Job"); jobClient.getJobExecutionResult().whenComplete((result, exception) -> { if (exception != null) { System.err.println("作业执行失败,终止ApplicationMaster"); // 终止当前AM进程,YARN会将应用标记为失败 System.exit(1); } });
如果是多作业场景,可以用CompletableFuture聚合所有作业的执行结果,只要有一个作业失败就触发退出:
List<CompletableFuture<JobExecutionResult>> jobFutures = new ArrayList<>(); // 提交第一个作业 JobClient job1 = env.executeAsync("Job 1"); jobFutures.add(job1.getJobExecutionResult()); // 提交第二个作业 StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment(); env2.setRestartStrategy(RestartStrategies.noRestart()); JobClient job2 = env2.executeAsync("Job 2"); jobFutures.add(job2.getJobExecutionResult()); // 等待所有作业完成,任意一个失败就终止进程 CompletableFuture.allOf(jobFutures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> { if (e != null) { System.err.println("存在作业失败,终止ApplicationMaster"); System.exit(1); } });
二、让Flink应用在内部作业失败时自行终止或失败
这个需求和AM终止强关联——只要AM终止,YARN上的整个应用就会被标记为失败状态。除了上面的方法,还可以补充以下配置确保整个应用的终止逻辑生效:
- 设置
jobmanager.execution.fail-on-failure=true:这个参数控制JobManager在作业失败时是否终止,默认值是true,但如果之前被修改过,需要确保设回true,这样JobManager会在作业失败后退出,进而触发AM的终止(配合FAIL_FAST配置)。 - 禁用YARN的AM重试:设置
yarn.application-attempts=1,避免YARN尝试重启失败的AM,确保应用直接进入失败状态。
完整的命令行提交示例:
flink run -t yarn-application \ -yD yarn.application-master.failure-handling=FAIL_FAST \ -yD restart-strategy.type=none \ -yD jobmanager.execution.fail-on-failure=true \ -yD yarn.application-attempts=1 \ your-app.jar
另外要注意,如果你的Flink版本较旧(比如1.12之前),部分参数名称可能有差异,比如yarn.appmaster.fail-fast代替yarn.application-master.failure-handling,需要根据你的版本调整。
内容的提问来源于stack exchange,提问作者lzh




