启用Checkpoint时Flink JobManager直接内存耗尽排查求助
Flink Checkpoint引发Direct Buffer内存耗尽问题
问题现象
- Flink应用运行2-3天后抛出异常:Thread 'jobmanager-io-thread-25' 出现未捕获异常
java.lang.OutOfMemoryError: Direct buffer memory,随后终止。 - 无论如何增大直接内存(最高已试16GB),内存仍会逐渐耗尽,只是延长运行时间,最终仍会终止。
- 禁用Checkpoint时,内存无增长,应用运行正常。
- 尝试过文档推荐的增量Checkpoint、状态变更日志、增大直接内存等调优手段,均无效。
堆栈跟踪
2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process... java.lang.OutOfMemoryError: Direct buffer memory at java.base/java.nio.Bits.reserveMemory(Bits.java:175) at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71) at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280) at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74) at java.base/java.nio.channels.Channels.writeFully(Channels.java:97) at java.base/java.nio.channels.Channels$1.write(Channels.java:172) at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48) at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54) at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112) at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78) at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107) at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108) at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758) at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264) at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109) at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165) at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83) at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56) at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101) at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88) at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109) at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)
Checkpoint配置

应用架构(概览)
- 部署在AWS EMR上的Flink应用,使用Flink 1.20、EMR 7.10.0
- 包含2个S3文件源,读取Parquet文件,均为无界流,以1分钟为间隔发现新文件,新文件每分钟新增至
yyyy/mm/dd/mi/files路径 - 处理两个流并生成对象
- 使用带30分钟TTL的
KeyedCoProcessFunction进行流Join - 对Join后的流执行map、filter操作
- 将结果流以Parquet格式写入S3
疑问
- 为何Checkpoint会逐渐消耗大量直接内存且无法释放?
- Checkpoint协调器在直接内存中存储的具体内容是什么?
- 如何处理或调优以解决该问题?
- 下一步可采取哪些排查和解决措施?
直接内存与Checkpoint大小对比

原生内存泄漏堆栈跟踪

问题解答
1. 内存泄漏原因分析
从堆栈和现象来看,内存泄漏发生在Checkpoint元数据序列化并写入S3的阶段。结合AWS S3文件系统实现,核心问题大概率是S3RecoverableFsDataOutputStream的直接缓冲区未被正确回收。Flink的S3客户端在处理Checkpoint元数据写入时,可能因引用持有或JVM直接内存回收机制延迟,导致临时直接缓冲区持续累积,最终耗尽内存。
另外,场景中每分钟生成大量小文件,Checkpoint会持续跟踪这些文件的状态句柄,随着时间推移,Checkpoint元数据中的状态句柄数量不断增长,序列化这些句柄时需要创建更多直接缓冲区,旧缓冲区未及时释放,形成泄漏。
2. Checkpoint协调器的直接内存存储内容
Checkpoint协调器在直接内存中主要存储三类内容:
- 正在进行的Checkpoint的元数据临时缓冲区:包含算子状态句柄、任务状态快照信息、Checkpoint ID/时间戳等
- 与TaskManager通信的IO缓冲区:接收Checkpoint确认信息、发送指令的临时缓冲
- S3文件系统客户端的写入缓冲:向S3写入Checkpoint元数据时的临时直接缓冲区
3. 解决方案与调优手段
(1)升级Flink版本
Flink 1.20存在S3文件系统客户端的已知内存泄漏问题,1.21+版本已修复相关Direct Buffer泄漏。建议升级到Flink 1.21或更高版本,配合EMR对应兼容版本。
(2)调整S3存储配置
- 设置
fs.s3a.fast.upload.buffer为disk:强制使用磁盘缓冲而非直接内存,避免直接内存堆积 - 指定
fs.s3a.buffer.dir配置:将S3写入的临时缓存目录指向本地磁盘,转移直接内存压力
(3)优化Checkpoint策略
- 延长Checkpoint间隔:从当前1分钟调整为5-10分钟,减少元数据生成频率
- 降低保留的Checkpoint数量:减小
state.checkpoints.num-retained的值,避免旧Checkpoint相关资源未释放 - 确保启用异步快照:
execution.checkpointing.async设为true,优化内存回收时机
(4)JVM参数调优
- 添加
-XX:+PrintDirectMemoryStatistics:监控直接内存使用情况,定位增长节点 - 合理设置
-XX:MaxDirectMemorySize:基于Checkpoint元数据大小的2-3倍配置,无需盲目调大
4. 下一步排查措施
- 启用Flink内存监控:通过Prometheus监控JobManager的
JobManager.Memory.Direct.Total指标,追踪内存增长阶段 - 生成内存快照:在内存耗尽前手动触发Heap Dump和Native Memory Dump,用
jmap、jcmd分析直接内存持有者 - 替换S3客户端:尝试使用Flink原生
fs.s3客户端而非EMR默认的fs.s3a,验证是否为客户端实现导致泄漏 - 简化应用测试:移除流Join或文件源,逐步缩小范围,确认是否为特定算子的状态处理导致元数据膨胀
内容的提问来源于stack exchange,提问作者Strange




