Yarn集群Flink作业Checkpoint路径配置及重启恢复问题求助
一、为什么代码设置的Checkpoint路径没生效?
你当前的配置和代码存在两个关键混淆点:
1. 配置项的用途混淆
你在flink-conf.yaml中配置的state.savepoints.dir是Savepoint的默认存储路径(手动触发Savepoint时使用),而自动Checkpoint的存储路径应该用配置项state.checkpoints.dir来指定。如果没有配置state.checkpoints.dir,Flink会根据你设置的StateBackend逻辑决定存储位置,这就导致了你的代码路径没有按预期生效。
2. StateBackend的设置方式(针对新版本Flink)
如果你的Flink版本是1.11及以上,FsStateBackend已经被标记为Deprecated,推荐使用HashMapStateBackend(内存中保存状态元数据,状态数据落地到文件系统)配合setCheckpointStorage来指定Checkpoint路径,而不是直接通过FsStateBackend的构造参数传递路径。
正确的代码写法应该是:
// 设置状态后端(内存存储状态元数据) streamExecutionEnvironment.setStateBackend(new HashMapStateBackend()); // 指定Checkpoint的存储路径 streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage("hdfs://localhost:9000/checkpoint/uuid-job-1");
如果坚持使用旧版的FsStateBackend,需要注意:如果flink-conf.yaml中配置了state.checkpoints.dir,这个全局配置会覆盖代码中FsStateBackend指定的路径。所以你需要二选一:
- 删除
state.checkpoints.dir配置(不需要全局默认Checkpoint路径时) - 确保代码中的路径设置在
enableCheckpointing方法之前执行
二、重启Yarn上的Flink作业(新application_id)时复用原Checkpoint
要让重启后的作业使用原作业的Checkpoint,按以下步骤操作即可:
1. 确保原Checkpoint被保留
首先在flink-conf.yaml中配置Checkpoint的保留数量,避免作业故障后Checkpoint被自动删除:
# 保留最近5个Checkpoint,可根据需求调整 state.checkpoints.num-retained: 5
2. 找到原作业的Checkpoint路径
原作业的Checkpoint会存储在你指定的路径下(比如hdfs://localhost:9000/checkpoint/uuid-job-1),里面会有多个以时间戳命名的子目录,每个子目录对应一个Checkpoint,其中包含_metadata文件(这是恢复作业的核心文件)。
你可以通过两种方式获取路径:
- 从Flink Web UI:进入原作业详情页 → 点击"Checkpoints"标签 → 找到需要恢复的Checkpoint,复制其"Path"
- 直接扫描HDFS目录:用
hdfs dfs -ls hdfs://localhost:9000/checkpoint/uuid-job-1查看,按时间戳排序取最新的子目录
3. 重启作业时指定从Checkpoint恢复
在提交重启的作业时,使用Flink CLI的-s参数指定Checkpoint的路径(可以是目录路径,也可以是_metadata文件的路径)。
如果是Per-Job模式提交到Yarn:
flink run -m yarn-cluster \ -s hdfs://localhost:9000/checkpoint/uuid-job-1/[具体Checkpoint目录]/_metadata \ your-job.jar
如果是Yarn Session模式,先启动Session再提交:
# 启动Yarn Session后台运行 yarn-session.sh -d # 提交作业并指定恢复路径 flink run -s hdfs://localhost:9000/checkpoint/uuid-job-1/[具体Checkpoint目录]/_metadata your-job.jar
4. 轮询重启脚本的适配
你的故障检测轮询脚本需要自动获取原作业的最新Checkpoint路径,才能实现无人值守的重启。可以通过两种方式实现:
- 调用Flink REST API:发送
GET http://<Flink-JM主机>:<端口>/jobs/<原Job-ID>/checkpoints请求,解析返回的JSON数据拿到最新的Checkpoint路径 - 定期扫描HDFS目录:编写脚本按时间戳排序Checkpoint子目录,自动选取最新的路径
这样即使Yarn生成了新的application_id,重启后的作业也能复用原作业的Checkpoint状态。
额外注意事项
- 重启后的作业必须和原作业的拓扑结构(算子逻辑、状态描述)完全一致,否则无法从Checkpoint恢复;如果有拓扑变更,建议使用Savepoint做兼容。
- 确保Flink的Yarn任务角色对HDFS的Checkpoint路径有读写权限,避免恢复时出现权限错误。
内容的提问来源于stack exchange,提问作者user3107673




