You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Paimon 实时数据湖开发
Paimon 写入性能优化
复制全文
下载 pdf
Paimon 写入性能优化

本文档旨在帮助用户系统性理解 Paimon 的写入性能调优技巧,并提供常见的优化建议,以提升写入吞吐量和稳定性。

1. 写入性能优化

Paimon 的写入性能与 Flink 的检查点机制密切相关。以下是一些常见的优化手段:

  • Checkpoint 间隔:增加 Checkpoint 间隔,减少 Checkpoint 频率,从而降低写入开销。

Image

1.2 缓冲区优化

  • 增大写入缓冲区:通过增加 write-buffer-size 来提升单次写入的数据量。
  • 启用可溢写缓冲区:启用 write-buffer-spillable,允许缓冲区在内存不足时将数据溢写到磁盘,避免 OOM。

1.3 桶数量调整

  • 如果使用固定桶模式(Fixed-Bucket Mode),建议根据数据量和集群资源调整桶的数量,避免桶数量过少导致写入热点。建议可以控制每个桶文件大小在 1GB 左右。

1.4 Changelog 生产者配置

  • Changelog 生产者:选项 changelog-producer 可以设置为 lookupfull-compaction,但这两个选项会对写入性能产生较大影响。
    • 快照/全量同步阶段,建议取消这些选项以提升写入性能。
    • 增量同步阶段,可以重新启用这些选项以支持增量数据处理。

1.5 异步 Compaction

  • 如果发现作业输入在背压情况下呈现锯齿状波动,可能是工作节点负载不均衡。此时可以启用异步压实(Asynchronous Compaction),观察吞吐量是否提升。
    • Compaction 操作本质上是异步的,但如果希望它完全异步且不阻塞写入操作,以期实现最大写入吞吐量,那么可以让异步更加平缓的进行操作。可以在表 WITH 参数中设置以下配置:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH (
    -- 开启异步 Compaction
    'num-sorted-run.stop-trigger' = '2147483647', 
    'sort-spill-threshold' = '10', 
    'changelog-producer.lookup-wait' = 'false', 
    ... 
);
  • 这种完全异步的 Compaction 配置会在写入高峰期生成更多小文件,并在写入低谷期逐渐合并这些文件,以实现最优的读取性能。

2. 并行度配置

Paimon 的写入并行度与桶数量密切相关,建议遵循以下原则:

  • Sink 并行度:建议将 Sink 的并行度设置为小于或等于桶的数量,最好与桶数量相等。
  • 配置方式:通过表属性 sink.parallelism 来调整 Sink 的并行度。例如:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('sink.parallelism' = '16', ...);

3. 本地合并(Local Merging)

如果作业存在 主键数据倾斜 问题(例如 AGG 表统计网站页面浏览量时,某些热门页面的访问量远高于其他页面),可以通过以下方式优化:

  • 启用本地合并:设置 local-merge-buffer-size,在数据按桶分区之前对输入记录进行缓冲和合并。
  • 推荐值:建议从 64 MB 开始调整,逐步优化。

注意:本地合并目前不支持 CDC 数据同步。

4. 文件格式选择

Paimon 支持多种文件格式,不同格式在写入性能和查询性能之间存在权衡。

4.1 行存储格式(AVRO)

  • 优点:写入吞吐量高,压缩性能优异。
  • 缺点:查询性能较差,尤其是当表中有大量列但只查询少数列时,IO 开销较大。此外,压缩效率较低,存储成本较高。
  • 适用场景:适合写入密集型场景,不适用于频繁查询的场景。
  • 配置方式
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('file.format' = 'avro', 'metadata.stats-mode' = 'none', ...);

4.2 分层文件格式

  • 如果不想将所有文件改为 AVRO 格式,可以仅将前几层文件设置为 AVRO 格式。例如:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('file.format.per.level' = '0:avro,1:avro', ...);

5. 文件压缩优化

Paimon 默认使用 zstd 压缩算法(级别为 1),可以通过以下方式优化压缩性能:

  • 调整压缩级别:增加 file.compression.zstd-level 以提高压缩率,但会降低读写速度。例如:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('file.compression.zstd-level' = '3', ...);

6. 稳定性优化

6.1 检查点超时

  • 如果桶数量或资源不足,全量压缩(Full-Compaction)可能导致检查点超时。Flink 默认检查点超时时间为 10 分钟,建议根据业务需求适当增加超时时间。例如:

Image

6.2 写入初始化加速

  • 在写入初始化时,桶的写入器需要读取所有历史文件。如果初始化时间过长,可以启用 write-manifest-cache 来缓存读取的清单数据,从而加速初始化。

7. 内存优化

Paimon 写入过程中主要占用内存的地方包括:

  1. 写入缓冲区:通过 write-buffer-size 调整。
  2. 压缩内存:通过 num-sorted-run.compaction-trigger 调整合并的排序运行数量。
  3. 大行数据读取:通过 read.batch-size 减少单次读取的数据量。
  4. ORC 写入内存:通过 orc.write.batch-size 调整 ORC 写入的批次大小。

7.1 字典编码优化

  • Parquet 格式:禁用字典编码以减少内存消耗:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('parquet.enable.dictionary' = 'false', ...);
  • ORC 格式:禁用字典编码或指定特定列禁用:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('orc.dictionary.key.threshold' = '0', 'orc.column.encoding.direct' = 'field1,field2');

7.2 托管内存优化

  • 如果 Flink 作业不依赖状态,可以禁用托管内存以减少内存开销:

Image

  • 或者,使用 Flink 托管内存来管理写入缓冲区:
CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('sink.use-managed-memory-allocator' = 'true', ...);

8. 提交内存优化

如果写入数据量特别大,提交节点(Committer)可能会消耗大量内存。可以通过以下方式优化:

  • 细粒度资源管理:启用 Flink 的细粒度资源管理功能,并单独增加提交器的堆内存。例如:

Image

CREATE TABLE IF NOT EXISTS catalog_test.db_test.table_test (...)
WITH ('sink.committer-memory' = '300m');

9. 推荐配置优化项

通过合理调整 Flink 配置、并行度、文件格式、压缩算法和内存参数,可以显著提升 Paimon 的写入性能和稳定性。建议根据实际业务场景和数据特点,逐步尝试上述优化手段,以达到最佳性能。

最后给出以下在大规模写入场景下,常见的写入性能优化建议配置项:

CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
    word varchar, -- 示例字段
    cnt bigint,
    PRIMARY KEY (word) NOT ENFORCED
) WITH (
    'bucket' = 'N',  -- 建议使用固定分层桶,可适当调大分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'sink.parallelism' = 'N', -- 在追数阶段,建议 sink 并行度保持和 bucket 数量一致
    -- 开启异步 Compaction
    'num-sorted-run.stop-trigger' = '2147483647', 
    'sort-spill-threshold' = '10', 
    'changelog-producer.lookup-wait' = 'false',  -- changelog-producer 为 none 的时候这个参数可以忽略
    -- 开启写入缓存 spill 到磁盘,避免 OOM 失败
    'write-buffer-spillable' = 'true',
    'changelog-producer' = 'none' -- 如果下游不依赖 changelog,或者在全量同步阶段,则可以设置为 none
);
最近更新时间:2025.02.10 10:49:01
这个页面对您有帮助吗?
有用
有用
无用
无用