You need to enable JavaScript to run this app.
导航
MiniBatch 配置
最近更新时间:2023.11.07 10:27:22首次发布时间:2023.11.07 10:27:22
复制全文
我的收藏
有用
有用
无用
无用

MiniBatch 的作用是缓存一定的数据后再触发处理,以减少对 State 的访问次数,从而提升吞吐量和减少数据输出量。

适用场景

  1. 降低频繁访问状态导致的 CPU 开销
    如果使用 RocksDB 作为 State Backend。每次访问 State,都需要进行序列化和反序列化,导致 CPU 开销比较大。开启 MiniBatch,可以减少对 State 的访问次数,降低一些 CPU 开销。
  2. 减少聚合的输出量
    在普通聚合中,每来一条数据就直接输出一条当前的聚合结果;但是开启 MiniBatch 后,如果在一个微批处理内有多条数据,只会输出一次,这样可以有效降低输出数据的量。

如需详细了解 MiniBatch 原理,请参见开源文档:MiniBatch Aggregation

如何启用 MiniBatch

MiniBatch 默认关闭。如需启用,您可以在 Flink 参数配置中配置以下参数:

配置

示例值

说明

table.exec.mini-batch.enabled

true

是否启用 MiniBatch。

  • true:启用
  • false:默认值,关闭。

table.exec.mini-batch.allow-latency

5s

MiniBatch 的时间间隔。

说明

启用 MiniBatch,会缓冲一批数据而不是立即处理数据,这会产生一些延迟。您需要根据业务需要配置mini-batch.allow-latency,在任务吞吐和数据时效性之间进行折中配置。

table.exec.mini-batch.size

10000

单个并发 buffer 数据的条数。
建议通过合理配置mini-batch.size以控制异常流量(如数据回溯,lag 过大等情况)下 buffer 的数据条数,防止异常流量下 buffer 数据条数过多直接造成内存 OOM。
您可以通过以下公式粗略计算,然后合理配置mini-batch.size来限制 buffer 的最大数据条数。
图片

  • 分子中的 f 表示系数,Max(QPS) 表示算子最大写入的 QPS,f * Max(QPS) 表示允许的最大流量。
  • 分子中的 latency 取自mini-batch.allow-latency,单位为秒。
  • 分母为作业的并发数,通过 Tm 的数据量乘以 slot 的数量计算得出。

说明

数据条数和时间间隔属于的关系,满足任意条件就组成了一个 MiniBatch。

常见问题

  1. MiniBatch 跟 LocalGlobal 的关系。
    LocalGlobal 将原先的 Aggregate 分成 Local 和 Global 两阶段聚合。第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批处理的增量值(Accumulator);第二阶段再将收到的 Accumulator 合并(Merge)得到最终的结果(GlobalAgg)。
    LocalGlobal 能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg 的热点,提升性能。如需详细了解 LocalGlobal 原理,请参见开源文档:Local-Global Aggregation
    LocalGlobal 的前提是必须要开启 MiniBatch。LocalGlobal 不需要手动开启,默认就是开启的。所以开启 MiniBatch 配置后,LocalGlobal 就会自动优化了。

  2. 开启 MiniBatch 是否可以从已有 Checkpoint 恢复?
    开启 MiniBatch 后,如果想要恢复开启 MiniBatch 前的状态,需要关闭 LocalGlobal。
    请通过以下参数关闭 LocalGlobal。

    // 开启 MiniBatch,LocalGlobal 也默认开启,则默认值为 TWO_PHASE。需要关闭时,设置为 ONE_PHASE。
    table.optimizer.agg-phase-strategy = ONE_PHASE