You need to enable JavaScript to run this app.
导航
实时聚合场景
最近更新时间:2025.08.05 23:32:39首次发布时间:2025.08.05 23:32:39
复制全文
我的收藏
有用
有用
无用
无用

实时聚合是 Flink SQL 最核心和广泛的应用场景,它允许用户使用熟悉的 SQL 语法(如 GROUP BY、窗口函数 TUMBLE/HOP/SESSION 以及聚合函数 SUMCOUNTAVGMAXMIN 等),直接在连续不断、无界的数据流上进行动态计算。
不同于传统批处理需要等待数据收集完毕,Flink SQL 能够基于事件时间或处理时间,以极低的延迟(毫秒到秒级)持续地计算并输出聚合结果(如每分钟的网站访问量、实时的交易总额、每秒钟的异常请求数、设备状态的平均值等)。
这种能力使得构建实时监控大屏、实时风控指标、实时报表、实时数据质量监控等应用变得异常简洁高效,极大地降低了开发复杂流处理逻辑的门槛,同时得益于 Flink 强大的状态管理和 Exactly-Once 语义,保证了计算的准确性和可靠性。

1. 实时聚合场景描述

本文我们以广告核心指标,包括曝光指标、点击指标、消费金额等指标的实时运算场景作为例子,介绍 Flink SQL 实时聚合的基本概念和使用技巧。

1.1 广告核心指标

指标类型

关键指标

业务意义

曝光指标

广告展示次数

衡量广告覆盖范围

互动指标

点击次数

评估用户参与度

财务指标

消费金额

计算广告成本与ROI

效果指标

点击率(CTR)

衡量广告效果

1.2 实时聚合的业务痛点

  • 数据延迟:离线T+1分析无法满足实时竞价决策
  • 状态管理:处理跨天数据时状态无限增长风险
  • 数据倾斜:热门广告活动导致聚合节点负载不均
  • 乱序处理:网络延迟导致事件乱序到达

有下图可以了解到,Flink SQL 从 Kafka 实时日志 Topic 消费,经过根据广告、竞价、日期等维度进行分组,对于展现、点击、消费等事件进行聚合。将计算结果导出到下游 OLAP 分析引擎或者 RDS 关系型数据库。
Image

2. 广告实时聚合实战

2.1 数据源表定义(Kafka)

CREATE TABLE ad_events (
    requestId     STRING,         -- 请求ID(包含时间戳)
    campaignId    BIGINT,         -- 广告计划ID
    adId          BIGINT,         -- 广告单元ID
    mgEventType   STRING,         -- 事件类型('1'=曝光,'2'=点击)
    price         DECIMAL(10, 4), -- 单次事件价格
    event_time    TIMESTAMP(3),   -- 事件时间
    
    -- 可选,水位线定义(允许2分钟乱序)
    -- WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
) WITH (
    'connector' = 'kafka',
    'topic' = 'ad_events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

2.2 基础聚合实现

INSERT INTO daily_agg_results
SELECT
    DATE_FORMAT(event_time, 'yyyy-MM-dd') AS agg_day,  -- 按天聚合
    campaignId,
    adId,
    SUM(CASE WHEN mgEventType = '1' THEN 1 ELSE 0 END) AS show_count,
    SUM(CASE WHEN mgEventType = '2' THEN 1 ELSE 0 END) AS click_count,
    SUM(CASE WHEN mgEventType IN ('1', '2') THEN price ELSE 0 END) AS consumption_amount
FROM ad_events
GROUP BY 
    DATE_FORMAT(event_time, 'yyyy-MM-dd'),
    campaignId,
    adId;

2.3 数据写入下游 MySQL 数据表

-- 在Flink SQL中设置状态TTL
SET 'table.exec.state.ttl' = '48h'; -- 48小时(毫秒)

-- 创建目标表
CREATE TABLE daily_agg_results (
    agg_day             STRING,
    campaignId          BIGINT,
    adId                BIGINT,
    show_count          BIGINT,
    click_count         BIGINT,
    consumption_amount  DECIMAL(15, 4),
    PRIMARY KEY (agg_day, campaignId, adId) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/dsp',
    'table-name' = 'daily_agg',
    'username' = 'flink',
    'password' = 'flink_pwd'
);

3.1 时间语义选择

时间类型

生成方式

特点

适用场景

处理时间

PROCTIME()

简单高效,无乱序问题

延迟敏感型指标

事件时间

数据自带时间戳

精确反映业务时间

财务结算,精确分析

摄入时间

数据进入Flink时间

介于处理时间和事件时间之间

一般监控场景

3.2 状态管理机制

在 Flink SQL 实时聚合中,状态(State) 扮演着核心记忆库的角色,它持续存储并动态更新每个聚合键(如广告计划ID+日期)的中间计算结果(如当前累计曝光数、消费金额等)。通过状态管理,Flink 实现了三大关键能力:

  1. 精准连续性:跨事件处理中持久化保存部分聚合结果(如 SUM 的中间累加值),确保任意时间点都能输出精确聚合值
  2. 乱序处理:借助 Watermark 机制,状态暂存迟到数据并重新计算,解决网络延迟导致的数据乱序问题
  3. 资源控制:通过 TTL(48小时)自动清理过期状态(如三天前的广告数据),避免状态无限膨胀

启用 Flink Checkpoint 能力:
Image
全局聚合算子的状态可能会随着输入数据的增多而快速增长,尤其是含有 distinct 的聚合,因此需要用户主动设置状态 State TTL。若配置了 State TTL(通过参数 table.exec.state.ttl 配置),则每一个 Group by Key 的 group 所对应的状态,在 ttl 时间内没有被更新时,将被清除重置。
Image
任务从状态进行重启,可以保证任务数据从上一次断点进行恢复:
Image

4. 高级聚合技巧

4.1 多维交叉分析

-- 按小时+广告计划聚合
SELECT
    DATE_FORMAT(event_time, 'yyyy-MM-dd HH') AS agg_hour,
    campaignId,
    COUNT(DISTINCT userId) AS uv,  -- 去重计数
    SUM(show_count) AS total_shows,
    SUM(click_count) AS total_clicks,
    SUM(consumption_amount) AS total_consumption
FROM daily_agg_results
GROUP BY 
    DATE_FORMAT(event_time, 'yyyy-MM-dd HH'),
    campaignId;

4.2 滚动窗口聚合

-- 1小时滚动窗口聚合
SELECT
    campaignId,
    adId,
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
    SUM(CASE WHEN mgEventType = '1' THEN 1 ELSE 0 END) AS show_count,
    AVG(price) AS avg_price
FROM ad_events
GROUP BY 
    campaignId,
    adId,
    TUMBLE(event_time, INTERVAL '1' HOUR);

4.3 实时CTR计算

-- 实时点击率计算
SELECT
    agg_day,
    campaignId,
    adId,
    show_count,
    click_count,
    -- 点击率计算(保留4位小数)
    ROUND(click_count * 1.0 / NULLIF(show_count, 0), 4) AS ctr,
    consumption_amount
FROM daily_agg_results;

5. 生产环境最佳实践

5.1 性能优化常见手段

  1. 并行度调优

越大则有越高的并发度,但会带来更多的内存占用和计算开销

SET 'parallelism.default' = '16';
  1. MiniBatch聚合

通过攒批的方式支持更少的状态访问次数和输出结果,能提升更高的吞吐

SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.size' = '5000';
  1. 数据倾斜处理
-- 添加随机前缀解决热点问题
SELECT 
    CONCAT(CAST(RAND()*10 AS INT), '_', campaignId) AS skew_key

6. 完整代码示例

6.1 完整解决方案

-- 设置环境参数
SET 'table.exec.state.ttl' = '48h'; -- 48小时TTL

-- 创建 Kafka 数据源
CREATE TABLE ad_events (
    requestId     STRING,         -- 请求ID(包含时间戳)
    campaignId    BIGINT,         -- 广告计划ID
    adId          BIGINT,         -- 广告单元ID
    mgEventType   STRING,         -- 事件类型('1'=曝光,'2'=点击)
    price         DECIMAL(10, 4), -- 单次事件价格
    event_time    TIMESTAMP(3),   -- 事件时间
    
    -- 可选,水位线定义(允许2分钟乱序)
    -- WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
) WITH (
    'connector' = 'kafka',
    'topic' = 'ad_events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

-- 创建目标表(MySQL)
CREATE TABLE sink_bh (
    `date`            STRING,
    campaignId        BIGINT,
    adId              BIGINT,
    sum_show          BIGINT,
    sum_click         BIGINT,
    consumption_amount DECIMAL(15,4),
    PRIMARY KEY (`date`, campaignId, adId) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/dsp_report',
    'table-name' = 'daily_ad_stats',
    'username' = 'flink_user',
    'password' = 'secure_password',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '30s'
);

-- 执行实时聚合
INSERT INTO sink_bh
SELECT
    DATE_FORMAT(event_time, 'yyyy-MM-dd') AS `date`,
    campaignId,
    adId,
    SUM(IF(mgEventType = '1', 1, 0)) AS sum_show,
    SUM(IF(mgEventType = '2', 1, 0)) AS sum_click,
    SUM(IF(mgEventType IN ('1', '2'), price, 0)) AS consumption_amount
FROM ad_events
GROUP BY 
    DATE_FORMAT(event_time, 'yyyy-MM-dd'),
    campaignId,
    adId;

7. 常见问题与解决方案

7.1 状态持续增长问题

问题现象:状态大小随时间线性增长
解决方案

  1. 确保TTL设置合理:SET 'table.exec.state.ttl' = '48h'
  2. 定期清理过期状态:启用RocksDB压缩过滤器
  3. 分区键优化:避免使用高基数字段作为分区键

7.2 乱序数据处理

问题现象:聚合结果不稳定
解决方案

  1. 合理设置Watermark:WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
  2. 增加迟到数据处理窗口
  3. 使用事件时间而非处理时间

7.3 聚合结果延迟

问题现象:看板数据更新不及时
解决方案

  1. 优化MiniBatch配置提升吞吐:
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.size' = '5000';
  1. 增加并行度:SET 'parallelism.default' = '16';
  2. 调整检查点间隔:SET 'execution.checkpointing.interval' = '30s';

7.4 状态不兼容

问题现象:SQL 代码经过修改后,任务无法启动。在日志中报如下类似错误Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@f39af915) must not be incompatible with the old state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@b714b61c).
Image
解决方案
该问题的原因有可能是因为 Flink SQL 做出了不兼容的升级(比如增加分组字段、增加聚合指标等),导致 Flink 任务状态不兼容,无法从状态恢复。必须要选择求其状态,从全新恢复。
在聚合场景我们可以选择从 Kafka/BMQ 等上游系统的某个时间位点开始全量追溯的方式进行回溯,比如我们可以从 2023-01-01 00:00:00 开始全量追溯,

  1. 在 Kafka 数据源上,可以增加如下过滤条件:where event_time >= '2023-01-01 00:00:00',仅选择回溯起点之后的数据

Image

  1. 全新启动,选择 Kafka 时间位点为回溯的时间起点:

Image

最佳实践提示

  1. 将状态TTL设置为业务需求的最大延迟时间+安全余量(如48小时)
  2. 使用事件时间而非处理时间进行财务相关计算
  3. 对高基数维度(如用户ID)使用近似去重函数减少状态压力
  4. 定期监控状态大小和延迟指标,确保系统稳定运行