实时聚合是 Flink SQL 最核心和广泛的应用场景,它允许用户使用熟悉的 SQL 语法(如 GROUP BY、窗口函数 TUMBLE/HOP/SESSION 以及聚合函数 SUM、COUNT、AVG、MAX、MIN 等),直接在连续不断、无界的数据流上进行动态计算。
不同于传统批处理需要等待数据收集完毕,Flink SQL 能够基于事件时间或处理时间,以极低的延迟(毫秒到秒级)持续地计算并输出聚合结果(如每分钟的网站访问量、实时的交易总额、每秒钟的异常请求数、设备状态的平均值等)。
这种能力使得构建实时监控大屏、实时风控指标、实时报表、实时数据质量监控等应用变得异常简洁高效,极大地降低了开发复杂流处理逻辑的门槛,同时得益于 Flink 强大的状态管理和 Exactly-Once 语义,保证了计算的准确性和可靠性。
本文我们以广告核心指标,包括曝光指标、点击指标、消费金额等指标的实时运算场景作为例子,介绍 Flink SQL 实时聚合的基本概念和使用技巧。
指标类型 | 关键指标 | 业务意义 |
|---|---|---|
曝光指标 | 广告展示次数 | 衡量广告覆盖范围 |
互动指标 | 点击次数 | 评估用户参与度 |
财务指标 | 消费金额 | 计算广告成本与ROI |
效果指标 | 点击率(CTR) | 衡量广告效果 |
有下图可以了解到,Flink SQL 从 Kafka 实时日志 Topic 消费,经过根据广告、竞价、日期等维度进行分组,对于展现、点击、消费等事件进行聚合。将计算结果导出到下游 OLAP 分析引擎或者 RDS 关系型数据库。
CREATE TABLE ad_events ( requestId STRING, -- 请求ID(包含时间戳) campaignId BIGINT, -- 广告计划ID adId BIGINT, -- 广告单元ID mgEventType STRING, -- 事件类型('1'=曝光,'2'=点击) price DECIMAL(10, 4), -- 单次事件价格 event_time TIMESTAMP(3), -- 事件时间 -- 可选,水位线定义(允许2分钟乱序) -- WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'ad_events', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );
INSERT INTO daily_agg_results SELECT DATE_FORMAT(event_time, 'yyyy-MM-dd') AS agg_day, -- 按天聚合 campaignId, adId, SUM(CASE WHEN mgEventType = '1' THEN 1 ELSE 0 END) AS show_count, SUM(CASE WHEN mgEventType = '2' THEN 1 ELSE 0 END) AS click_count, SUM(CASE WHEN mgEventType IN ('1', '2') THEN price ELSE 0 END) AS consumption_amount FROM ad_events GROUP BY DATE_FORMAT(event_time, 'yyyy-MM-dd'), campaignId, adId;
-- 在Flink SQL中设置状态TTL SET 'table.exec.state.ttl' = '48h'; -- 48小时(毫秒) -- 创建目标表 CREATE TABLE daily_agg_results ( agg_day STRING, campaignId BIGINT, adId BIGINT, show_count BIGINT, click_count BIGINT, consumption_amount DECIMAL(15, 4), PRIMARY KEY (agg_day, campaignId, adId) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysql:3306/dsp', 'table-name' = 'daily_agg', 'username' = 'flink', 'password' = 'flink_pwd' );
时间类型 | 生成方式 | 特点 | 适用场景 |
|---|---|---|---|
处理时间 |
| 简单高效,无乱序问题 | 延迟敏感型指标 |
事件时间 | 数据自带时间戳 | 精确反映业务时间 | 财务结算,精确分析 |
摄入时间 | 数据进入Flink时间 | 介于处理时间和事件时间之间 | 一般监控场景 |
在 Flink SQL 实时聚合中,状态(State) 扮演着核心记忆库的角色,它持续存储并动态更新每个聚合键(如广告计划ID+日期)的中间计算结果(如当前累计曝光数、消费金额等)。通过状态管理,Flink 实现了三大关键能力:
启用 Flink Checkpoint 能力:
全局聚合算子的状态可能会随着输入数据的增多而快速增长,尤其是含有 distinct 的聚合,因此需要用户主动设置状态 State TTL。若配置了 State TTL(通过参数 table.exec.state.ttl 配置),则每一个 Group by Key 的 group 所对应的状态,在 ttl 时间内没有被更新时,将被清除重置。
任务从状态进行重启,可以保证任务数据从上一次断点进行恢复:
-- 按小时+广告计划聚合 SELECT DATE_FORMAT(event_time, 'yyyy-MM-dd HH') AS agg_hour, campaignId, COUNT(DISTINCT userId) AS uv, -- 去重计数 SUM(show_count) AS total_shows, SUM(click_count) AS total_clicks, SUM(consumption_amount) AS total_consumption FROM daily_agg_results GROUP BY DATE_FORMAT(event_time, 'yyyy-MM-dd HH'), campaignId;
-- 1小时滚动窗口聚合 SELECT campaignId, adId, TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start, SUM(CASE WHEN mgEventType = '1' THEN 1 ELSE 0 END) AS show_count, AVG(price) AS avg_price FROM ad_events GROUP BY campaignId, adId, TUMBLE(event_time, INTERVAL '1' HOUR);
-- 实时点击率计算 SELECT agg_day, campaignId, adId, show_count, click_count, -- 点击率计算(保留4位小数) ROUND(click_count * 1.0 / NULLIF(show_count, 0), 4) AS ctr, consumption_amount FROM daily_agg_results;
越大则有越高的并发度,但会带来更多的内存占用和计算开销
SET 'parallelism.default' = '16';
通过攒批的方式支持更少的状态访问次数和输出结果,能提升更高的吞吐
SET 'table.exec.mini-batch.enabled' = 'true'; SET 'table.exec.mini-batch.size' = '5000';
-- 添加随机前缀解决热点问题 SELECT CONCAT(CAST(RAND()*10 AS INT), '_', campaignId) AS skew_key
-- 设置环境参数 SET 'table.exec.state.ttl' = '48h'; -- 48小时TTL -- 创建 Kafka 数据源 CREATE TABLE ad_events ( requestId STRING, -- 请求ID(包含时间戳) campaignId BIGINT, -- 广告计划ID adId BIGINT, -- 广告单元ID mgEventType STRING, -- 事件类型('1'=曝光,'2'=点击) price DECIMAL(10, 4), -- 单次事件价格 event_time TIMESTAMP(3), -- 事件时间 -- 可选,水位线定义(允许2分钟乱序) -- WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'ad_events', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ); -- 创建目标表(MySQL) CREATE TABLE sink_bh ( `date` STRING, campaignId BIGINT, adId BIGINT, sum_show BIGINT, sum_click BIGINT, consumption_amount DECIMAL(15,4), PRIMARY KEY (`date`, campaignId, adId) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysql:3306/dsp_report', 'table-name' = 'daily_ad_stats', 'username' = 'flink_user', 'password' = 'secure_password', 'sink.buffer-flush.max-rows' = '5000', 'sink.buffer-flush.interval' = '30s' ); -- 执行实时聚合 INSERT INTO sink_bh SELECT DATE_FORMAT(event_time, 'yyyy-MM-dd') AS `date`, campaignId, adId, SUM(IF(mgEventType = '1', 1, 0)) AS sum_show, SUM(IF(mgEventType = '2', 1, 0)) AS sum_click, SUM(IF(mgEventType IN ('1', '2'), price, 0)) AS consumption_amount FROM ad_events GROUP BY DATE_FORMAT(event_time, 'yyyy-MM-dd'), campaignId, adId;
问题现象:状态大小随时间线性增长
解决方案:
SET 'table.exec.state.ttl' = '48h'问题现象:聚合结果不稳定
解决方案:
WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE问题现象:看板数据更新不及时
解决方案:
SET 'table.exec.mini-batch.enabled' = 'true'; SET 'table.exec.mini-batch.size' = '5000';
SET 'parallelism.default' = '16';SET 'execution.checkpointing.interval' = '30s';问题现象:SQL 代码经过修改后,任务无法启动。在日志中报如下类似错误Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@f39af915) must not be incompatible with the old state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@b714b61c).
解决方案:
该问题的原因有可能是因为 Flink SQL 做出了不兼容的升级(比如增加分组字段、增加聚合指标等),导致 Flink 任务状态不兼容,无法从状态恢复。必须要选择求其状态,从全新恢复。
在聚合场景我们可以选择从 Kafka/BMQ 等上游系统的某个时间位点开始全量追溯的方式进行回溯,比如我们可以从 2023-01-01 00:00:00 开始全量追溯,
where event_time >= '2023-01-01 00:00:00',仅选择回溯起点之后的数据
- 将状态TTL设置为业务需求的最大延迟时间+安全余量(如48小时)
- 使用事件时间而非处理时间进行财务相关计算
- 对高基数维度(如用户ID)使用近似去重函数减少状态压力
- 定期监控状态大小和延迟指标,确保系统稳定运行