You need to enable JavaScript to run this app.
导航
日志实时去重场景
最近更新时间:2025.08.05 23:32:39首次发布时间:2025.08.05 23:32:39
复制全文
我的收藏
有用
有用
无用
无用

1. 日志去重

1.1 为什么需要日志去重?

在实时日志处理中,重复日志是常见问题,主要由以下原因引起:

  • 客户端重试机制导致重复请求
  • 网络抖动造成消息重复投递
  • 分布式系统幂等性处理不当
  • 日志采集管道重复发送

去重价值

  • 节省存储成本(减少30%-50%存储空间)
  • 提升分析准确性(避免重复计数)
  • 优化下游处理性能

Image

核心机制

  • PARTITION BY:定义去重维度(如reqid)
  • ROW_NUMBER() 窗口函数:为每个分区内的记录分配唯一序号
  • ORDER BY:决定保留策略(ASC保留第一条,DESC保留最后一条)
  • row_num = 1:筛选条件保留目标记录

2. 环境准备与基础配置

2.1 准备条件

步骤

操作

说明

1.1

已开通流式计算 Flink 版项目

参考 开通流式计算 Flink 版

1.2

已创建 Flink SQL 任务

参考 创建 Flink SQL 任务
建议参考去重模板
Image

2.2 日志源表配置(Kafka)

CREATE TABLE raw_logs (
  log_time TIMESTAMP(3),
  reqid    VARCHAR(40),    -- 请求ID(去重键)
  service  VARCHAR(20),    -- 服务名称
  level    VARCHAR(10),    -- 日志级别
  message  STRING,         -- 日志内容
  proc_time AS PROCTIME(), -- 处理时间
  
  -- 事件时间配置(可选)
  WATERMARK FOR log_time AS log_time - INTERVAL '10' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'raw_logs',
  'properties.bootstrap.servers' = 'kafka:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
);

3. 去重实战:从简单到复杂场景

3.1 基础去重:按 reqid 去重(保留最新日志)

CREATE TABLE dedup_logs_simple (
  reqid    VARCHAR(40) PRIMARY KEY NOT ENFORCED,
  log_time TIMESTAMP(3),
  service  VARCHAR(20),
  message  STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'dedup_logs',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json'
);

INSERT INTO dedup_logs_simple
SELECT reqid, log_time, service, message
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY reqid
      ORDER BY proc_time DESC    -- 按处理时间保留最新
    ) AS row_num
  FROM raw_logs
) 
WHERE row_num = 1;

3.2 复合键去重:reqid + 服务名

INSERT INTO dedup_logs_simple
SELECT reqid, log_time, service, message
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY reqid, service  -- 组合键分区
      ORDER BY proc_time DESC       -- 按处理时间保留最新
    ) AS row_num
  FROM raw_logs
) 
WHERE row_num = 1;

3.3 时间窗口去重(每小时保留第一条)

INSERT INTO dedup_logs_simple
SELECT reqid, log_time, service, message
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY reqid, 
                   HOUR(proc_time)  -- 按小时分区
      ORDER BY proc_time ASC        -- 保留每小时第一条
    ) AS row_num
  FROM raw_logs
) 
WHERE row_num = 1;

4. 高级配置与性能优化

4.1 状态管理(State TTL 详解)

Image

不同模式下的 TTL 的行为如下:

模式

ORDER BY

触发清理条件

输出行为

keepFirst

ASC

首条记录时间+TTL

新记录直接输出

keepLast

DESC

最后更新时间+TTL

先撤回旧值再输出新值

4.2 处理时间 vs 事件时间对比

特性

处理时间 (PROCTIME)

事件时间 (Event Time)

精确性

低(依赖处理速度)

高(基于实际发生时间)

乱序处理

不支持

支持(通过Watermark)

配置复杂度

简单(自动生成)

复杂(需定义WATERMARK)

适用场景

简单去重,延迟敏感

精确去重,需要时间准确性

事件时间完整示例

CREATE TABLE dedup_event_time (
  reqid    VARCHAR(40),
  log_time TIMESTAMP(3),
  service  VARCHAR(20),
  message  STRING
) WITH (/* Kafka配置 */);

INSERT INTO dedup_event_time
SELECT reqid, log_time, service, message
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY reqid
      ORDER BY log_time DESC    -- 按事件时间排序
    ) AS row_num
  FROM raw_logs
  WHERE WATERMARK(log_time) IS NOT NULL -- 确保事件时间有效
) 
WHERE row_num = 1;

4.3 性能优化技巧

  1. 分区键优化
-- 添加前缀减少状态大小(适用于高基数键)
PARTITION BY SUBSTRING(reqid, 1, 4), reqid
  1. 资源调整

Image

  1. MiniBatch 优化(可以参考 MiniBatch 配置
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.size' = '5000';

5. 生产环境最佳实践

5.1 端到端精确一次保证

在 Flink 中进行去重之后,建议使用 Flink 的精确一次语义,保证数据的准确性。

5.2 常见问题排查指南

问题现象

可能原因

解决方案

去重结果仍有重复

TTL设置过短

增加TTL时间

状态持续增长

分区键基数过高

优化分区键或增加并行度

处理延迟增加

数据倾斜

添加随机前缀分散负载

Watermark不推进

事件时间数据中断

添加心跳数据或使用处理时间

状态恢复失败

检查点不完整

检查存储系统连接性