You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

启用Checkpoint时Flink JobManager直接内存耗尽排查求助

问题现象

  • Flink应用运行2-3天后抛出异常:Thread 'jobmanager-io-thread-25' 出现未捕获异常 java.lang.OutOfMemoryError: Direct buffer memory,随后终止。
  • 无论如何增大直接内存(最高已试16GB),内存仍会逐渐耗尽,只是延长运行时间,最终仍会终止。
  • 禁用Checkpoint时,内存无增长,应用运行正常。
  • 尝试过文档推荐的增量Checkpoint、状态变更日志、增大直接内存等调优手段,均无效。

堆栈跟踪

2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Direct buffer memory
    at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
    at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
    at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
    at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71)
    at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280)
    at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74)
    at java.base/java.nio.channels.Channels.writeFully(Channels.java:97)
    at java.base/java.nio.channels.Channels$1.write(Channels.java:172)
    at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48)
    at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54)
    at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78)
    at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
    at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101)
    at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
    at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410)
    at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
    at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)

Checkpoint配置

Checkpoint配置

应用架构(概览)

  • 部署在AWS EMR上的Flink应用,使用Flink 1.20、EMR 7.10.0
  • 包含2个S3文件源,读取Parquet文件,均为无界流,以1分钟为间隔发现新文件,新文件每分钟新增至yyyy/mm/dd/mi/files路径
  • 处理两个流并生成对象
  • 使用带30分钟TTL的KeyedCoProcessFunction进行流Join
  • 对Join后的流执行map、filter操作
  • 将结果流以Parquet格式写入S3

疑问

  1. 为何Checkpoint会逐渐消耗大量直接内存且无法释放?
  2. Checkpoint协调器在直接内存中存储的具体内容是什么?
  3. 如何处理或调优以解决该问题?
  4. 下一步可采取哪些排查和解决措施?

直接内存与Checkpoint大小对比

直接内存与Checkpoint大小对比

原生内存泄漏堆栈跟踪

原生内存泄漏堆栈跟踪


问题解答

1. 内存泄漏原因分析

从堆栈和现象来看,内存泄漏发生在Checkpoint元数据序列化并写入S3的阶段。结合AWS S3文件系统实现,核心问题大概率是S3RecoverableFsDataOutputStream的直接缓冲区未被正确回收。Flink的S3客户端在处理Checkpoint元数据写入时,可能因引用持有或JVM直接内存回收机制延迟,导致临时直接缓冲区持续累积,最终耗尽内存。

另外,场景中每分钟生成大量小文件,Checkpoint会持续跟踪这些文件的状态句柄,随着时间推移,Checkpoint元数据中的状态句柄数量不断增长,序列化这些句柄时需要创建更多直接缓冲区,旧缓冲区未及时释放,形成泄漏。

2. Checkpoint协调器的直接内存存储内容

Checkpoint协调器在直接内存中主要存储三类内容:

  • 正在进行的Checkpoint的元数据临时缓冲区:包含算子状态句柄、任务状态快照信息、Checkpoint ID/时间戳等
  • 与TaskManager通信的IO缓冲区:接收Checkpoint确认信息、发送指令的临时缓冲
  • S3文件系统客户端的写入缓冲:向S3写入Checkpoint元数据时的临时直接缓冲区

3. 解决方案与调优手段

(1)升级Flink版本

Flink 1.20存在S3文件系统客户端的已知内存泄漏问题,1.21+版本已修复相关Direct Buffer泄漏。建议升级到Flink 1.21或更高版本,配合EMR对应兼容版本。

(2)调整S3存储配置

  • 设置fs.s3a.fast.upload.bufferdisk:强制使用磁盘缓冲而非直接内存,避免直接内存堆积
  • 指定fs.s3a.buffer.dir配置:将S3写入的临时缓存目录指向本地磁盘,转移直接内存压力

(3)优化Checkpoint策略

  • 延长Checkpoint间隔:从当前1分钟调整为5-10分钟,减少元数据生成频率
  • 降低保留的Checkpoint数量:减小state.checkpoints.num-retained的值,避免旧Checkpoint相关资源未释放
  • 确保启用异步快照:execution.checkpointing.async设为true,优化内存回收时机

(4)JVM参数调优

  • 添加-XX:+PrintDirectMemoryStatistics:监控直接内存使用情况,定位增长节点
  • 合理设置-XX:MaxDirectMemorySize:基于Checkpoint元数据大小的2-3倍配置,无需盲目调大

4. 下一步排查措施

  • 启用Flink内存监控:通过Prometheus监控JobManager的JobManager.Memory.Direct.Total指标,追踪内存增长阶段
  • 生成内存快照:在内存耗尽前手动触发Heap Dump和Native Memory Dump,用jmapjcmd分析直接内存持有者
  • 替换S3客户端:尝试使用Flink原生fs.s3客户端而非EMR默认的fs.s3a,验证是否为客户端实现导致泄漏
  • 简化应用测试:移除流Join或文件源,逐步缩小范围,确认是否为特定算子的状态处理导致元数据膨胀

内容的提问来源于stack exchange,提问作者Strange

火山引擎 最新活动