You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Flink SQL 实时计算
实时聚合场景
复制全文
下载 pdf
实时聚合场景

实时聚合是 Flink SQL 最核心和广泛的应用场景,它允许用户使用熟悉的 SQL 语法(如 GROUP BY、窗口函数 TUMBLE/HOP/SESSION 以及聚合函数 SUMCOUNTAVGMAXMIN 等),直接在连续不断、无界的数据流上进行动态计算。
不同于传统批处理需要等待数据收集完毕,Flink SQL 能够基于事件时间或处理时间,以极低的延迟(毫秒到秒级)持续地计算并输出聚合结果(如每分钟的网站访问量、实时的交易总额、每秒钟的异常请求数、设备状态的平均值等)。
这种能力使得构建实时监控大屏、实时风控指标、实时报表、实时数据质量监控等应用变得异常简洁高效,极大地降低了开发复杂流处理逻辑的门槛,同时得益于 Flink 强大的状态管理和 Exactly-Once 语义,保证了计算的准确性和可靠性。

1. 实时聚合场景描述

本文我们以广告核心指标,包括曝光指标、点击指标、消费金额等指标的实时运算场景作为例子,介绍 Flink SQL 实时聚合的基本概念和使用技巧。

1.1 广告核心指标

指标类型

关键指标

业务意义

曝光指标

广告展示次数

衡量广告覆盖范围

互动指标

点击次数

评估用户参与度

财务指标

消费金额

计算广告成本与ROI

效果指标

点击率(CTR)

衡量广告效果

1.2 实时聚合的业务痛点

  • 数据延迟:离线T+1分析无法满足实时竞价决策
  • 状态管理:处理跨天数据时状态无限增长风险
  • 数据倾斜:热门广告活动导致聚合节点负载不均
  • 乱序处理:网络延迟导致事件乱序到达

有下图可以了解到,Flink SQL 从 Kafka 实时日志 Topic 消费,经过根据广告、竞价、日期等维度进行分组,对于展现、点击、消费等事件进行聚合。将计算结果导出到下游 OLAP 分析引擎或者 RDS 关系型数据库。
Image

2. 广告实时聚合实战

2.1 数据源表定义(Kafka)

CREATE TABLE ad_events (
    requestId     STRING,         -- 请求ID(包含时间戳)
    campaignId    BIGINT,         -- 广告计划ID
    adId          BIGINT,         -- 广告单元ID
    mgEventType   STRING,         -- 事件类型('1'=曝光,'2'=点击)
    price         DECIMAL(10, 4), -- 单次事件价格
    event_time    TIMESTAMP(3),   -- 事件时间
    
    -- 可选,水位线定义(允许2分钟乱序)
    -- WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
) WITH (
    'connector' = 'kafka',
    'topic' = 'ad_events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

2.2 基础聚合实现

INSERT INTO daily_agg_results
SELECT
    DATE_FORMAT(event_time, 'yyyy-MM-dd') AS agg_day,  -- 按天聚合
    campaignId,
    adId,
    SUM(CASE WHEN mgEventType = '1' THEN 1 ELSE 0 END) AS show_count,
    SUM(CASE WHEN mgEventType = '2' THEN 1 ELSE 0 END) AS click_count,
    SUM(CASE WHEN mgEventType IN ('1', '2') THEN price ELSE 0 END) AS consumption_amount
FROM ad_events
GROUP BY 
    DATE_FORMAT(event_time, 'yyyy-MM-dd'),
    campaignId,
    adId;

2.3 数据写入下游 MySQL 数据表

-- 在Flink SQL中设置状态TTL
SET 'table.exec.state.ttl' = '48h'; -- 48小时(毫秒)

-- 创建目标表
CREATE TABLE daily_agg_results (
    agg_day             STRING,
    campaignId          BIGINT,
    adId                BIGINT,
    show_count          BIGINT,
    click_count         BIGINT,
    consumption_amount  DECIMAL(15, 4),
    PRIMARY KEY (agg_day, campaignId, adId) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/dsp',
    'table-name' = 'daily_agg',
    'username' = 'flink',
    'password' = 'flink_pwd'
);

3.1 时间语义选择

时间类型

生成方式

特点

适用场景

处理时间

PROCTIME()

简单高效,无乱序问题

延迟敏感型指标

事件时间

数据自带时间戳

精确反映业务时间

财务结算,精确分析

摄入时间

数据进入Flink时间

介于处理时间和事件时间之间

一般监控场景

3.2 状态管理机制

在 Flink SQL 实时聚合中,状态(State) 扮演着核心记忆库的角色,它持续存储并动态更新每个聚合键(如广告计划ID+日期)的中间计算结果(如当前累计曝光数、消费金额等)。通过状态管理,Flink 实现了三大关键能力:

  1. 精准连续性:跨事件处理中持久化保存部分聚合结果(如 SUM 的中间累加值),确保任意时间点都能输出精确聚合值
  2. 乱序处理:借助 Watermark 机制,状态暂存迟到数据并重新计算,解决网络延迟导致的数据乱序问题
  3. 资源控制:通过 TTL(48小时)自动清理过期状态(如三天前的广告数据),避免状态无限膨胀

启用 Flink Checkpoint 能力:
Image
全局聚合算子的状态可能会随着输入数据的增多而快速增长,尤其是含有 distinct 的聚合,因此需要用户主动设置状态 State TTL。若配置了 State TTL(通过参数 table.exec.state.ttl 配置),则每一个 Group by Key 的 group 所对应的状态,在 ttl 时间内没有被更新时,将被清除重置。
Image
任务从状态进行重启,可以保证任务数据从上一次断点进行恢复:
Image

4. 高级聚合技巧

4.1 多维交叉分析

-- 按小时+广告计划聚合
SELECT
    DATE_FORMAT(event_time, 'yyyy-MM-dd HH') AS agg_hour,
    campaignId,
    COUNT(DISTINCT userId) AS uv,  -- 去重计数
    SUM(show_count) AS total_shows,
    SUM(click_count) AS total_clicks,
    SUM(consumption_amount) AS total_consumption
FROM daily_agg_results
GROUP BY 
    DATE_FORMAT(event_time, 'yyyy-MM-dd HH'),
    campaignId;

4.2 滚动窗口聚合

-- 1小时滚动窗口聚合
SELECT
    campaignId,
    adId,
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
    SUM(CASE WHEN mgEventType = '1' THEN 1 ELSE 0 END) AS show_count,
    AVG(price) AS avg_price
FROM ad_events
GROUP BY 
    campaignId,
    adId,
    TUMBLE(event_time, INTERVAL '1' HOUR);

4.3 实时CTR计算

-- 实时点击率计算
SELECT
    agg_day,
    campaignId,
    adId,
    show_count,
    click_count,
    -- 点击率计算(保留4位小数)
    ROUND(click_count * 1.0 / NULLIF(show_count, 0), 4) AS ctr,
    consumption_amount
FROM daily_agg_results;

5. 生产环境最佳实践

5.1 性能优化常见手段

  1. 并行度调优

越大则有越高的并发度,但会带来更多的内存占用和计算开销

SET 'parallelism.default' = '16';
  1. MiniBatch聚合

通过攒批的方式支持更少的状态访问次数和输出结果,能提升更高的吞吐

SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.size' = '5000';
  1. 数据倾斜处理
-- 添加随机前缀解决热点问题
SELECT 
    CONCAT(CAST(RAND()*10 AS INT), '_', campaignId) AS skew_key

6. 完整代码示例

6.1 完整解决方案

-- 设置环境参数
SET 'table.exec.state.ttl' = '48h'; -- 48小时TTL

-- 创建 Kafka 数据源
CREATE TABLE ad_events (
    requestId     STRING,         -- 请求ID(包含时间戳)
    campaignId    BIGINT,         -- 广告计划ID
    adId          BIGINT,         -- 广告单元ID
    mgEventType   STRING,         -- 事件类型('1'=曝光,'2'=点击)
    price         DECIMAL(10, 4), -- 单次事件价格
    event_time    TIMESTAMP(3),   -- 事件时间
    
    -- 可选,水位线定义(允许2分钟乱序)
    -- WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
) WITH (
    'connector' = 'kafka',
    'topic' = 'ad_events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);

-- 创建目标表(MySQL)
CREATE TABLE sink_bh (
    `date`            STRING,
    campaignId        BIGINT,
    adId              BIGINT,
    sum_show          BIGINT,
    sum_click         BIGINT,
    consumption_amount DECIMAL(15,4),
    PRIMARY KEY (`date`, campaignId, adId) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/dsp_report',
    'table-name' = 'daily_ad_stats',
    'username' = 'flink_user',
    'password' = 'secure_password',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '30s'
);

-- 执行实时聚合
INSERT INTO sink_bh
SELECT
    DATE_FORMAT(event_time, 'yyyy-MM-dd') AS `date`,
    campaignId,
    adId,
    SUM(IF(mgEventType = '1', 1, 0)) AS sum_show,
    SUM(IF(mgEventType = '2', 1, 0)) AS sum_click,
    SUM(IF(mgEventType IN ('1', '2'), price, 0)) AS consumption_amount
FROM ad_events
GROUP BY 
    DATE_FORMAT(event_time, 'yyyy-MM-dd'),
    campaignId,
    adId;

7. 常见问题与解决方案

7.1 状态持续增长问题

问题现象:状态大小随时间线性增长
解决方案

  1. 确保TTL设置合理:SET 'table.exec.state.ttl' = '48h'
  2. 定期清理过期状态:启用RocksDB压缩过滤器
  3. 分区键优化:避免使用高基数字段作为分区键

7.2 乱序数据处理

问题现象:聚合结果不稳定
解决方案

  1. 合理设置Watermark:WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
  2. 增加迟到数据处理窗口
  3. 使用事件时间而非处理时间

7.3 聚合结果延迟

问题现象:看板数据更新不及时
解决方案

  1. 优化MiniBatch配置提升吞吐:
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.size' = '5000';
  1. 增加并行度:SET 'parallelism.default' = '16';
  2. 调整检查点间隔:SET 'execution.checkpointing.interval' = '30s';

7.4 状态不兼容

问题现象:SQL 代码经过修改后,任务无法启动。在日志中报如下类似错误Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@f39af915) must not be incompatible with the old state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@b714b61c).
Image
解决方案
该问题的原因有可能是因为 Flink SQL 做出了不兼容的升级(比如增加分组字段、增加聚合指标等),导致 Flink 任务状态不兼容,无法从状态恢复。必须要选择求其状态,从全新恢复。
在聚合场景我们可以选择从 Kafka/BMQ 等上游系统的某个时间位点开始全量追溯的方式进行回溯,比如我们可以从 2023-01-01 00:00:00 开始全量追溯,

  1. 在 Kafka 数据源上,可以增加如下过滤条件:where event_time >= '2023-01-01 00:00:00',仅选择回溯起点之后的数据

Image

  1. 全新启动,选择 Kafka 时间位点为回溯的时间起点:

Image

最佳实践提示

  1. 将状态TTL设置为业务需求的最大延迟时间+安全余量(如48小时)
  2. 使用事件时间而非处理时间进行财务相关计算
  3. 对高基数维度(如用户ID)使用近似去重函数减少状态压力
  4. 定期监控状态大小和延迟指标,确保系统稳定运行
最近更新时间:2025.08.05 23:32:39
这个页面对您有帮助吗?
有用
有用
无用
无用