You need to enable JavaScript to run this app.
导航
Flink SQL 数据倾斜处理最佳实践
最近更新时间:2025.09.04 11:30:15首次发布时间:2025.08.26 00:02:53
复制全文
我的收藏
有用
有用
无用
无用

1. 文档背景

本文档旨在为使用流式计算 Flink 版的用户提供指导,帮助其解决在 Flink SQL 作业开发过程中,尤其是在进行 GROUP BY 聚合时,遇到的数据倾斜问题。数据倾斜是分布式计算中的典型问题,会导致作业性能瓶颈、资源利用不均、甚至作业失败。本文档将解释其现象、根因,并提供在平台上可落地的最佳实践与解决方案。

2. 场景解释

2.1 什么是数据倾斜

数据倾斜是指在分布式计算中,由于数据分布不均匀,导致绝大部分数据集中分布到某一个或某几个任务(TaskManager)上,而其他任务只处理了少量数据。这些处理了大量数据的任务会成为整个作业的瓶颈,引发以下问题:

  • 处理延迟: 单个任务处理时间远长于其他任务,拖慢整个作业的吞吐量与产出速度。
  • 资源浪费: 其他任务早已执行完毕,资源空闲,而热点任务仍在高负荷运行。
  • 背压(Backpressure): 热点任务处理不及,导致数据堆积,向上游传递背压,可能引起数据摄入延迟或故障。
  • 内存溢出(OOM): 热点任务在聚合时可能需要维护非常大的状态(如 Keyed State),极易导致 TaskManager 内存溢出,造成作业重启或失败。

在 Flink SQL 中,数据倾斜最常发生在 GROUP BYJOIN 等需要根据 Key 进行分区的操作中。

2.2 如何识别是否出现数据倾斜问题

在流式计算 Flink 版的控制台,可以通过以下方式快速识别数据倾斜:

  1. 作业 Flink UI(核心手段):
    • 反压监控: 在作业的 「运维」「监控」 页面,查看反压情况。如果拓扑图中某个节点持续显示为 「High」 反压,而其下游节点正常,则该节点很可能发生了数据倾斜。
    • 算子吞吐量(Records Sent/Received)与繁忙度: 对比同一个算子(如 groupBy 算子)的不同并行子任务(Subtask)。如果某个 Subtask 的记录处理数(Records Processed) 或状态大小远高于其他并行度相同的 Subtask,即可判定发生了数据倾斜。

具体数据倾斜状态可以参考如下 Flink UI 状态,算子持续属于 Busy 状态,同时发现仅有一个 SubTask 数据量远超其他 SubTask
Image

  1. SQL 分析:
    • 如果从 Flink UI 只能简单看出任务确实存在倾斜,但需要确认是哪些具体的 Key 出现了热点,则需要对数据源进行抽样查询,直接分析分组 Key 的分布:
SELECT `key`, COUNT(1) AS cnt 
FROM source_table 
GROUP BY `key` 
ORDER BY cnt DESC 
LIMIT 10;
如果排名前几的 Key 的数据量(`cnt`)比其他 Key 高出数个数量级,则该 Key 即为热点 Key。

3. 常见处理方案

对于轻度倾斜或作为其他方案的辅助手段,可以在控制台通过调整作业参数进行缓解。

  • 增加并行度: 在作业 「配置」 中,适当调大作业的并行度。这为处理热点 Key 提供了更多的任务槽(Slot),可能将热点 Key 的压力分散到更多线程中(但无法解决单个 Key 的倾斜问题,通常需要结合其他方法)。

Image

  • 开启 Mini-Batch 优化(强烈推荐): Mini-Batch 通过微批处理减少对状态的访问频率,能显著提升吞吐量并缓解倾斜带来的背压。可在 「高级参数」 中配置:
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s  # 攒批的间隔时间
table.exec.mini-batch.size: 5000         # 攒批的最大记录数

Image

优点: 配置简单,能有效提升吞吐。
缺点: 无法从根本上解决单个 Key 数据量过大的问题。

3.2 两阶段聚合(根治方案 - 推荐)

这是解决聚合倾斜最有效和通用的方法。核心思想是将热点 Key 打散,先进行局部聚合,再进行全局聚合。

3.2.1 普通聚合场景

适用场景: SUM, COUNT, MAX, MIN 等可进行两阶段计算的聚合操作。这些操作通常是对数据进行汇总统计,产生一个单独的结果或一组聚合结果。在普通聚合中,对重复的数据不作特殊处理,相同的数据会被视为同一条记录参与聚合操作。

实施步骤:

  1. 第一阶段 - 打散并局部聚合: 为原始数据添加一个随机前缀/后缀,将原热点 Key 变为多个新 Key,进行第一次聚合。
  2. 第二阶段 - 合并并全局聚合: 去除随机后缀,对第一阶段的局部结果按原 Key 进行第二次聚合,得到最终结果。

Local-Merge 示例:
在聚合中,shuffle 和访问 state 是比较大的消耗。我们可以通过打开两阶段优化来加入 local aggregation,如下图所示:
Image
可以看到,Local Aggregation 会在上游算子的末尾直接做一次预聚合,且不访问状态数据,最后传递到 Global Aggregation 的数据量会大量减少,能够有效缓解数据倾斜。需要在任务开发 - 自定义参数设置如下:

# 聚合策略配置,默认是AUTO,选择TWO_PHASE和ONE_PHASE取决于优化过程,可以强制使用TWO_PHASE
"table.optimizer.agg-phase-strategy": "TWO_PHASE" 

# 以下三个配置是minibatch的必选配置,使用local-global aggregate优化必须打开minibatch
"table.exec.mini-batch.enabled": "true"  
"table.exec.mini-batch.allow-latency": "5 s"
"table.exec.mini-batch.size": "5000"

注意:当所有的聚合函数允许 merge 结果时(UDAF 需要实现merge函数),两阶段优化才能生效。

Flink SQL 示例
若打开两阶段优化后,仍然未能缓解,常见于上游已经出现严重的数据倾斜,则可以考虑手动将 groupby key 打散。可以参考如下 SQL 示例:

假设需要计算每个 user_id 的订单总额 SUM(amount),且 user_idabc 的是热点 Key。

-- 第一步:创建临时视图,进行打散局部聚合 (假设打散成10份)
CREATE TEMPORARY VIEW partial_agg AS
SELECT
  user_id,
  suffix,
  SUM(amount) AS partial_sum
FROM (
  SELECT
    user_id,
    amount,
    -- 使用RANDOM_INT或HASH_CODE生成随机后缀(0-9)
    CAST(FLOOR(RAND() * 10) AS INT) AS suffix -- 在Flink SQL中可用RANDOM_INT(10)替代
  FROM orders
)
GROUP BY user_id, suffix;

-- 第二步:进行全局聚合
SELECT
  user_id,
  SUM(partial_sum) AS total_amount
FROM partial_agg
GROUP BY user_id;

操作提示: 可将上述逻辑在一个 SQL 作业中通过临时视图拆分编写。

3.2.2 去重聚合场景

在 Flink SQL 中,处理去重聚合(比如求 UV 等场景),会有 Map 类的结构来存储所有去重 Key 值。如果去重 Key 值分布比较稀疏,则两阶段优化并不能帮助减少单个并发的处理数据量。
Image
如上图左侧所示,local aggregation 后,distinct 聚合相关的数据依然很多,下游 global aggregation 仍可能产生热点。因此我们需要开启如下优化:

# 打开split优化的开关,默认关闭,需要手动开启
"table.optimizer.distinct-agg.split.enabled": "true"
# 当打开split优化时,可以配置分桶个数,默认1024个
"table.optimizer.distinct-agg.split.bucket-num": "1024"

# 以下三个配置是minibatch的必选配置,使用split优化必须打开minibatch
"table.exec.mini-batch.enabled": "true" 
"table.exec.mini-batch.allow-latency": "5 s"
"table.exec.mini-batch.size": "5000"

开启此优化后,原来的聚合会被优化成两层聚合,其中 distinct key 在内层先被打散,效果如上图的右侧所示。例如以下SQL:

SELECT 
    a,
    COUNT(DISTINCT user_id)
FROM
    source
GROUP BY
    a

当 split 优化被打开后,效果类似于改写 SQL 为:

SELECT 
    a,
    SUM(cnt)
FROM
    (
    SELECT 
        a,
        COUNT(DISTINCT user_id) as cnt,
    FROM
        source
    GROUP BY
        a,
        MOD(HASH_CODE(user_id), 1024)
    ) T
GROUP BY
    a

3.3 过滤热点 Key(特定场景方案)

如果倾斜由极少数已知的超热点 Key(如爬虫、测试账号)引起,且业务可接受其单独处理或暂时忽略。

实施步骤:

  1. 使用 WHERE 子句过滤掉热点 Key,先快速计算非热点数据的正确结果。
  2. 对过滤掉的热点 Key 进行单独处理。
  3. 使用 UNION ALL 合并两部分结果。

Flink SQL 示例:

-- 1. 处理非热点数据
SELECT user_id, SUM(amount) AS total_amount
FROM orders
WHERE user_id != 'super_hot_user_abc'
GROUP BY user_id

UNION ALL

-- 2. 单独处理热点数据
SELECT 'super_hot_user_abc' AS user_id, SUM(amount) AS total_amount
FROM orders
WHERE user_id = 'super_hot_user_abc'

优点: 简单直接,效果立竿见影。
缺点: 需要事先知晓热点 Key,且业务逻辑可能变复杂。

3.4 优化 GROUP BY 分组

检查 SQL 逻辑,从业务角度避免不必要的倾斜。

  • 移除不必要的分组字段: 审视 GROUP BY 后的每个字段,移除那些非必须的、高基数的维度字段。
  • 采用更粗粒度的分组: 如果业务允许,使用更高层次的分组维度(如将 user_id 替换为 city,或在时间维度上加大窗口)。

3.5 处理维表 Join 的倾斜

JOIN 维表(如 MySQL, HBase)时,如果流表数据存在热点 Key,会导致大量查询集中打向维表的某个分区。

解决方案:

  • 开启异步IO: 确保维表 Join 使用 ASYNC 模式,避免同步查询阻塞算子。
  • 配置维表缓存: 利用 LRU 等缓存策略减少对外部数据库的重复查询请求。
  • 预处理维表数据: 如果维表本身很大且分布不均,可在数据源端进行分桶或分区设计。
  • 在流表侧打散: 类似于两阶段聚合,先对流表的热点 Key 进行打散,再与维表进行 JOIN,最后再合并结果。此法较为复杂,可根据实际情况评估。

4. 总结数据倾斜的处理流程

在上遇到 Flink SQL 数据倾斜时,建议遵循以下决策流程进行排查和解决:

  1. 确认问题: 通过控制台监控面板确认是否存在反压、算子负载不均等情况,定位发生倾斜的算子。
  2. 分析热点: 编写抽样 SQL,分析数据分布,识别热点 Key。
  3. 选择方案:
    • 通用场景(聚合): 优先采用 「两阶段聚合」 (3.2) ,并同步开启 「Mini-Batch 优化」 (3.1)。
    • 已知热点Key: 如果热点Key极少且可处理,考虑 「过滤热点Key」 (3.3) 方案。
    • SQL优化: 始终检查并 「优化 GROUP BY 分组」 (3.4),从源头避免问题。
    • 关联查询: 如果是维表 JOIN 倾斜,参考 「处理维表 Join 的倾斜」 (3.5) 方案。
  4. 验证效果: 实施优化后,再次回到控制台监控面板,观察反压是否消失、各 Subtask 负载是否均衡、吞吐量是否提升,以验证解决方案的有效性。