You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Yarn集群Flink作业Checkpoint路径配置及重启恢复问题求助

解决Flink作业Checkpoint路径不生效&重启复用Checkpoint的问题

一、为什么代码设置的Checkpoint路径没生效?

你当前的配置和代码存在两个关键混淆点:

1. 配置项的用途混淆

你在flink-conf.yaml中配置的state.savepoints.dirSavepoint的默认存储路径(手动触发Savepoint时使用),而自动Checkpoint的存储路径应该用配置项state.checkpoints.dir来指定。如果没有配置state.checkpoints.dir,Flink会根据你设置的StateBackend逻辑决定存储位置,这就导致了你的代码路径没有按预期生效。

2. StateBackend的设置方式(针对新版本Flink)

如果你的Flink版本是1.11及以上,FsStateBackend已经被标记为Deprecated,推荐使用HashMapStateBackend(内存中保存状态元数据,状态数据落地到文件系统)配合setCheckpointStorage来指定Checkpoint路径,而不是直接通过FsStateBackend的构造参数传递路径。

正确的代码写法应该是:

// 设置状态后端(内存存储状态元数据)
streamExecutionEnvironment.setStateBackend(new HashMapStateBackend());
// 指定Checkpoint的存储路径
streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage("hdfs://localhost:9000/checkpoint/uuid-job-1");

如果坚持使用旧版的FsStateBackend,需要注意:如果flink-conf.yaml中配置了state.checkpoints.dir,这个全局配置会覆盖代码中FsStateBackend指定的路径。所以你需要二选一:

  • 删除state.checkpoints.dir配置(不需要全局默认Checkpoint路径时)
  • 确保代码中的路径设置在enableCheckpointing方法之前执行

二、重启Yarn上的Flink作业(新application_id)时复用原Checkpoint

要让重启后的作业使用原作业的Checkpoint,按以下步骤操作即可:

1. 确保原Checkpoint被保留

首先在flink-conf.yaml中配置Checkpoint的保留数量,避免作业故障后Checkpoint被自动删除:

# 保留最近5个Checkpoint,可根据需求调整
state.checkpoints.num-retained: 5

2. 找到原作业的Checkpoint路径

原作业的Checkpoint会存储在你指定的路径下(比如hdfs://localhost:9000/checkpoint/uuid-job-1),里面会有多个以时间戳命名的子目录,每个子目录对应一个Checkpoint,其中包含_metadata文件(这是恢复作业的核心文件)。

你可以通过两种方式获取路径:

  • 从Flink Web UI:进入原作业详情页 → 点击"Checkpoints"标签 → 找到需要恢复的Checkpoint,复制其"Path"
  • 直接扫描HDFS目录:用hdfs dfs -ls hdfs://localhost:9000/checkpoint/uuid-job-1查看,按时间戳排序取最新的子目录

3. 重启作业时指定从Checkpoint恢复

在提交重启的作业时,使用Flink CLI的-s参数指定Checkpoint的路径(可以是目录路径,也可以是_metadata文件的路径)。

如果是Per-Job模式提交到Yarn:

flink run -m yarn-cluster \
  -s hdfs://localhost:9000/checkpoint/uuid-job-1/[具体Checkpoint目录]/_metadata \
  your-job.jar

如果是Yarn Session模式,先启动Session再提交:

# 启动Yarn Session后台运行
yarn-session.sh -d
# 提交作业并指定恢复路径
flink run -s hdfs://localhost:9000/checkpoint/uuid-job-1/[具体Checkpoint目录]/_metadata your-job.jar

4. 轮询重启脚本的适配

你的故障检测轮询脚本需要自动获取原作业的最新Checkpoint路径,才能实现无人值守的重启。可以通过两种方式实现:

  • 调用Flink REST API:发送GET http://<Flink-JM主机>:<端口>/jobs/<原Job-ID>/checkpoints请求,解析返回的JSON数据拿到最新的Checkpoint路径
  • 定期扫描HDFS目录:编写脚本按时间戳排序Checkpoint子目录,自动选取最新的路径

这样即使Yarn生成了新的application_id,重启后的作业也能复用原作业的Checkpoint状态。

额外注意事项

  • 重启后的作业必须和原作业的拓扑结构(算子逻辑、状态描述)完全一致,否则无法从Checkpoint恢复;如果有拓扑变更,建议使用Savepoint做兼容。
  • 确保Flink的Yarn任务角色对HDFS的Checkpoint路径有读写权限,避免恢复时出现权限错误。

内容的提问来源于stack exchange,提问作者user3107673

火山引擎 最新活动