You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Flink内部作业失败时自动终止YARN上的Flink ApplicationMaster?

刚好之前处理过类似的Flink on YARN问题,针对你提到的两个需求,我整理了具体的解决方案,分两部分来说:

要实现作业失败时让AM(ApplicationMaster)终止,核心是让AM感知到作业的失败状态并触发自身退出,主要有两种方式:

1. 通过配置参数直接控制

Flink提供了YARN相关的配置参数,可以直接让AM在作业失败时快速退出:

  • 设置yarn.application-master.failure-handling=FAIL_FAST:这个参数控制AM遇到不可恢复错误时的行为,设为FAIL_FAST后,AM会直接退出而不是尝试恢复,YARN会将整个应用标记为失败。
  • 配合关闭作业重启策略:设置restart-strategy.type=none,确保作业失败后不会自动重启,让AM能及时感知到失败状态。

你可以在提交应用时通过命令行参数指定:

flink run -t yarn-application \
  -yD yarn.application-master.failure-handling=FAIL_FAST \
  -yD restart-strategy.type=none \
  your-app.jar

2. 代码层面添加作业失败监听器

如果需要更灵活的控制(比如多作业场景下监控任意一个作业失败),可以在代码中添加作业状态监听器,当作业失败时主动终止AM进程:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 关闭重启策略
env.setRestartStrategy(RestartStrategies.noRestart());

// 提交作业并监听执行结果
JobClient jobClient = env.executeAsync("Your Streaming Job");
jobClient.getJobExecutionResult().whenComplete((result, exception) -> {
    if (exception != null) {
        System.err.println("作业执行失败,终止ApplicationMaster");
        // 终止当前AM进程,YARN会将应用标记为失败
        System.exit(1);
    }
});

如果是多作业场景,可以用CompletableFuture聚合所有作业的执行结果,只要有一个作业失败就触发退出:

List<CompletableFuture<JobExecutionResult>> jobFutures = new ArrayList<>();

// 提交第一个作业
JobClient job1 = env.executeAsync("Job 1");
jobFutures.add(job1.getJobExecutionResult());

// 提交第二个作业
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
env2.setRestartStrategy(RestartStrategies.noRestart());
JobClient job2 = env2.executeAsync("Job 2");
jobFutures.add(job2.getJobExecutionResult());

// 等待所有作业完成,任意一个失败就终止进程
CompletableFuture.allOf(jobFutures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
    if (e != null) {
        System.err.println("存在作业失败,终止ApplicationMaster");
        System.exit(1);
    }
});
二、让Flink应用在内部作业失败时自行终止或失败

这个需求和AM终止强关联——只要AM终止,YARN上的整个应用就会被标记为失败状态。除了上面的方法,还可以补充以下配置确保整个应用的终止逻辑生效:

  • 设置jobmanager.execution.fail-on-failure=true:这个参数控制JobManager在作业失败时是否终止,默认值是true,但如果之前被修改过,需要确保设回true,这样JobManager会在作业失败后退出,进而触发AM的终止(配合FAIL_FAST配置)。
  • 禁用YARN的AM重试:设置yarn.application-attempts=1,避免YARN尝试重启失败的AM,确保应用直接进入失败状态。

完整的命令行提交示例:

flink run -t yarn-application \
  -yD yarn.application-master.failure-handling=FAIL_FAST \
  -yD restart-strategy.type=none \
  -yD jobmanager.execution.fail-on-failure=true \
  -yD yarn.application-attempts=1 \
  your-app.jar

另外要注意,如果你的Flink版本较旧(比如1.12之前),部分参数名称可能有差异,比如yarn.appmaster.fail-fast代替yarn.application-master.failure-handling,需要根据你的版本调整。

内容的提问来源于stack exchange,提问作者lzh

火山引擎 最新活动