在实时日志处理中,重复日志是常见问题,主要由以下原因引起:
去重价值:
核心机制:
PARTITION BY:定义去重维度(如reqid)ROW_NUMBER() 窗口函数:为每个分区内的记录分配唯一序号ORDER BY:决定保留策略(ASC保留第一条,DESC保留最后一条)row_num = 1:筛选条件保留目标记录步骤 | 操作 | 说明 |
|---|---|---|
1.1 | 已开通流式计算 Flink 版项目 | |
1.2 | 已创建 Flink SQL 任务 | 参考 创建 Flink SQL 任务 |
CREATE TABLE raw_logs ( log_time TIMESTAMP(3), reqid VARCHAR(40), -- 请求ID(去重键) service VARCHAR(20), -- 服务名称 level VARCHAR(10), -- 日志级别 message STRING, -- 日志内容 proc_time AS PROCTIME(), -- 处理时间 -- 事件时间配置(可选) WATERMARK FOR log_time AS log_time - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'raw_logs', 'properties.bootstrap.servers' = 'kafka:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true' );
CREATE TABLE dedup_logs_simple ( reqid VARCHAR(40) PRIMARY KEY NOT ENFORCED, log_time TIMESTAMP(3), service VARCHAR(20), message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'dedup_logs', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ); INSERT INTO dedup_logs_simple SELECT reqid, log_time, service, message FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY reqid ORDER BY proc_time DESC -- 按处理时间保留最新 ) AS row_num FROM raw_logs ) WHERE row_num = 1;
INSERT INTO dedup_logs_simple SELECT reqid, log_time, service, message FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY reqid, service -- 组合键分区 ORDER BY proc_time DESC -- 按处理时间保留最新 ) AS row_num FROM raw_logs ) WHERE row_num = 1;
INSERT INTO dedup_logs_simple SELECT reqid, log_time, service, message FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY reqid, HOUR(proc_time) -- 按小时分区 ORDER BY proc_time ASC -- 保留每小时第一条 ) AS row_num FROM raw_logs ) WHERE row_num = 1;
不同模式下的 TTL 的行为如下:
模式 | ORDER BY | 触发清理条件 | 输出行为 |
|---|---|---|---|
keepFirst | ASC | 首条记录时间+TTL | 新记录直接输出 |
keepLast | DESC | 最后更新时间+TTL | 先撤回旧值再输出新值 |
特性 | 处理时间 (PROCTIME) | 事件时间 (Event Time) |
|---|---|---|
精确性 | 低(依赖处理速度) | 高(基于实际发生时间) |
乱序处理 | 不支持 | 支持(通过Watermark) |
配置复杂度 | 简单(自动生成) | 复杂(需定义WATERMARK) |
适用场景 | 简单去重,延迟敏感 | 精确去重,需要时间准确性 |
事件时间完整示例:
CREATE TABLE dedup_event_time ( reqid VARCHAR(40), log_time TIMESTAMP(3), service VARCHAR(20), message STRING ) WITH (/* Kafka配置 */); INSERT INTO dedup_event_time SELECT reqid, log_time, service, message FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY reqid ORDER BY log_time DESC -- 按事件时间排序 ) AS row_num FROM raw_logs WHERE WATERMARK(log_time) IS NOT NULL -- 确保事件时间有效 ) WHERE row_num = 1;
-- 添加前缀减少状态大小(适用于高基数键) PARTITION BY SUBSTRING(reqid, 1, 4), reqid
SET 'table.exec.mini-batch.enabled' = 'true'; SET 'table.exec.mini-batch.size' = '5000';
在 Flink 中进行去重之后,建议使用 Flink 的精确一次语义,保证数据的准确性。
问题现象 | 可能原因 | 解决方案 |
|---|---|---|
去重结果仍有重复 | TTL设置过短 | 增加TTL时间 |
状态持续增长 | 分区键基数过高 | 优化分区键或增加并行度 |
处理延迟增加 | 数据倾斜 | 添加随机前缀分散负载 |
Watermark不推进 | 事件时间数据中断 | 添加心跳数据或使用处理时间 |
状态恢复失败 | 检查点不完整 | 检查存储系统连接性 |