You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Paimon 实时数据湖开发
Paimon 维表打宽功能
复制全文
下载 pdf
Paimon 维表打宽功能

1. 概述

1.1 维表查询

Flink SQL 任务中的实时维表打宽是一种非常常见的流式计算场景,通过实时数据流与外部维表进行实时关联,无需额外状态管理,语义简单且支持高 QPS 吞吐。常见的维表查询场景包含但不限于:

  1. 用户行为分析:用户行为日志关联用户的用户名、基础信息、历史行为等。
  2. 实时推荐系统:用户的实时操作与推荐模型中的特征数据进行关联,生成个性化推荐结果。
  3. 金融风控:交易数据与风险控制规则库进行关联,实时检测和预警异常交易行为。
  4. 物联网监控:设备数据与设备配置或状态信息进行关联,实时监控设备运行情况并及时发现故障。

1.2 Paimon 作为维表优势

维表打宽一般需要引入高 QPS 点查的远端存储(如 MySQL、HBase、Redis 等),增加了运维复杂性。在某些业务场景中,维表数据还需支持下游流批读写,可能额外引入 Dump Hive、Dump MQ 等链路,进一步扩展了其应用范围和复杂性。Paimon 作为基于 LSM-Tree 的存储系统,支持流批读写的同时,能非常好的支撑维表查询的场景。
Image
使用 Paimon 表直接作为维表的优势在于:

  • 性能良好:基于 LSM-Tree 的存储结构,提供高效的数据查找效率
  • 场景丰富:支持流批读写,简化维表维护链路
  • 成本较低:无需独占的计算资源,直接基于远端存储(HDFS、TOS)、本地文件、本地内存构建多级缓存,优化读取性能

2. 基础示例

以下示例主要使用 Flink SQL 的开发方式,因为篇幅原因本文不再赘述具体操作步骤,可以参考 Paimon 使用 Fileystem Catalog 开发

2.1 创建维表

在如下 SQL 中,我们构建了一张 dim_orders 表作为订单信息的维表。其中订单表的主键和分桶键都是 order_id ,即订单 id。我们后续在实时维表打宽作业的过程中会基于订单 id 进行关联,获取订单的关联信息。

CREATE CATALOG paimon_dim
WITH
  (
    'type' = 'paimon',
    'warehouse' = 'tos://flink-cwz-paimon/paimon_dim'
  );
  
CREATE DATABASE IF NOT EXISTS paimon_dim.dim_test;

-- 创建维表数据表
CREATE TABLE IF NOT EXISTS
  `paimon_dim`.`dim_test`.`dim_orders` (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `order_customer_id` INT,
    `order_status` STRING,
    `create_date` TIMESTAMP,
    `create_ts` INT,
    PRIMARY key (`order_id`) NOT ENFORCED
  )
WITH
  (
    'bucket' = '20',
    'bucket-key' = 'order_id',
    -- 为了控制数据量,可以将创建日期两天前数据进行过期
    'record-level.expire-time' = '2 d',
    'record-level.time-field' = 'create_ts'
  );

2.2 实时更新维表

当创建好维表之后,我们可以持续向维表中更新数据,代表正常的订单变更业务:

-- 使用 datagen 模拟数据写入维表
CREATE TABLE
  datagen_dim_source (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `order_customer_id` INT,
    `order_status` STRING
  )
WITH
  (
    'connector' = 'datagen'
  );

-- 生成数据实时写入维表
INSERT INTO
  `paimon_dim`.`dim_test`.`dim_orders`
SELECT
  order_id, order_name, order_product_id, order_customer_id, order_status, CURRENT_TIMESTAMP AS create_date, CAST(UNIX_TIMESTAMP () AS INT) AS create_ts
FROM
  datagen_dim_source;

2.3 模拟实时数据流

首先我们使用 datagen模拟实时数据流。注意这里采用 create_date作为处理时间。

-- 模拟数据流
CREATE TABLE
  datagen_source (
    `product_id` INT,
    `product_name` STRING,
    `product_category_id` INT,
    `product_order_id` INT,
    `product_status` STRING,
    `create_date` AS proctime ()
  )
WITH
  ('connector' = 'datagen');
  
-- 实时数据打宽后的下游宽表,测试中使用 print 打印到控制台
CREATE TABLE
  print_sink (
    `product_id` INT,
    `product_name` STRING,
    `product_category_id` INT,
    `product_order_id` INT,
    `product_status` STRING,
    `create_date` TIMESTAMP,
    `order_name` STRING,
    `orders_customer_id` INT,
    `order_status` STRING
  )
WITH
  ('connector' = 'print');

2.4 实时 lookup join

以下 SQL 语句定义了一个实时打宽的视图 trade_orde_view,通过将流式数据源 datagen_source 与 Paimon 维表 dim_orders 进行关联,实现数据的实时打宽。

-- 实时打宽后过滤 order_name 不为空的数据写入下游
INSERT INTO
  print_sink
SELECT
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id
WHERE
  orders.order_name IS NOT NULL;  

其中维表关联的具体逻辑解释如下:

  • 维表paimon_dim.dim_test.dim_orders 是上文中定义的 Paimon 维表。
  • 时间语义FOR SYSTEM_TIME AS OF gen.create_date 表示根据流数据中的 create_date 字段,查找维表在 create_date 时刻的快照数据。
  • 关联条件gen.product_order_id = orders.order_id 表示将流数据中的 product_order_id 与维表中的 order_id 进行关联。

3. 优化技巧

3.1 Lookup 延迟重试

如果 datagen_source 表(主表)的记录由于 orders 表(维表)的对应数据未准备好而无法完成 Join 操作,可以考虑使用 Flink 的 Lookup 延迟重试策略。该功能仅适用于 Flink-1.16-volcano 及以上版本。

-- 使用 hint 进行 Lookup 重试
SELECT /*+ LOOKUP('table'='orders', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

3.2 Lookup 异步延迟重试

同步重试的问题是,一条记录会阻塞后续记录,导致整个作业被阻塞。你可以考虑使用 异步(async) + 允许乱序(allow_unordered)来避免阻塞,这样 Join 缺失的记录将不再阻塞其他记录。

-- 使用 hint 进行 Lookup 重试
SELECT /*+ LOOKUP('table'='orders', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

3.3 动态分区

在传统数据仓库中,每个分区通常维护最新的全量数据,因此这种分区表只需要关联最新的分区。Paimon 专门为此场景开发了 max_pt 功能。

-- 如果 orders 表是每天一个快照的全量分区
CREATE TABLE IF NOT EXISTS
  `paimon_dim`.`dim_test`.`dim_orders` (
    `order_id` INT,
    `order_name` STRING,
    `order_product_id` INT,
    `order_customer_id` INT,
    `order_status` STRING,
    `create_date` TIMESTAMP,
    `create_ts` INT,
    `dt` STRING,
    PRIMARY key (`order_id`, `dt`) NOT ENFORCED
  ) PARTITIONED BY (dt);

在这种情况下,要使用 max_pt进行选择最新的分区进行维表关联:

-- 使用 hint 选择最新分区
SELECT 
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

3.5 缓存策略

在不同的 Look Join 场景中有部分缓存和全部缓存两种策略,可以提供开发者更好的性能调优选项。

3.5.1 部分缓存

部分缓存模式是在 Join 的过程中按需缓存命中的数据文件,如图所示:
Image
通过建表语句中 WITH 参数 'lookup.cache'='auto' 来开启,当满足以下两种情况:

  • 关联表为固定分桶模式的主键表
  • 关联表表的主键和 Join Key 一致

此时会自动选择部分缓存(Partial Cache)模式。而不满足这两个条件时会选择全部缓存(Full Cache)模式。

部分缓存能够利用 LSM-Tree 的主键有序性,实现维表缓存数据按需加载,避免全量数据加载,任务初始化更快。

注意:仅支持主键表的主键关联场景,如果关联 Key 不是主键,则无法使用。

3.5.1 全部缓存

全部缓存会批量将 Paimon 表数据全部 Load 到 RocksDB 中,这样能够在关联 Key 非主键的情况下,能够 Lookup 成功:
Image
通过参数 'lookup.cache'='full' 来开启
全部缓存模式,支持主键表的主键关联和非主键关联两种模式,也支持非主键表的关联。但是劣势是初始化加载时间较长,冷启动现象明显。

3.6 Bucket Shuffle

火山引擎的 Paimon 版本为社区提供了 Bucket Shuffle 功能,极大地提升了 Lookup Join 大规模维表时候的性能。Bucket Shuffle 的原理如下:

Image

Image

没有开启 Bucket Shuffle 功能
开启 Bucket Shuffle 功能

可以看出,在开启 Bucket Shuffle 的 Lookup Join 过程中,主数据会根据 Join Key 进行 Hash 分组处理,这样在每个分组中只要缓存对应 Bucket 数据,大大减少了内存用量,减少了缓存淘汰的概率。可以支持更大规模的维表。开启方法如下,在 hint 中设置 'shuffle' = 'true'

-- 使用 hint 选择最新分区
SELECT /*+ LOOKUP('table'='orders', 'shuffle'='true') */ 
  gen.product_id, gen.product_name, gen.product_category_id,
  gen.product_order_id, gen.product_status, gen.create_date,
  orders.order_name, orders.order_customer_id, order_status
FROM
  datagen_source AS gen
  JOIN `paimon_dim`.`dim_test`.`dim_orders` 
  FOR SYSTEM_TIME AS OF gen.create_date AS orders 
  ON gen.product_order_id = orders.order_id;

适用场景:

  • 固定分桶表:主键表和 Append 表(非主键表)都支持
  • Join Key 要包含所有的 Bucket Key

4. 参数详解

以下是常见的维表的建表 WITH 参数设置:

参数

说明

数据类型

必填

默认值

备注

lookup.cache-max-memory-size

Paimon维表的内存缓存大小。

String

256 MB

该参数值会同时影响维表缓存大小和lookup changelog-producer的缓存大小,两个机制的缓存大小都由该参数配置。

lookup.cache

Paimon维表缓存模式

Enum

AUTO

参数支持 AUTO, FULL两个枚举值, 当满足条件时会优先使用Partial Cache模式

lookup.continuous.discovery-interval

维表数据刷新间隔

Duration

10s

指定维表增量数据刷新间隔

lookup.cache-max-disk-size

维表本地缓存最大空间占用

MemorySize

无限制

指定维表和Lookup Compaction本地磁盘空间占用上限, 目前仅适用于Partial Cache场景

lookup.local-file-type

维表文件类型

Enum

HASH

仅针对于Partial Cache场景, 支持 HASH 和 SORT 两种模式. Sort模式初始化更快

最近更新时间:2025.02.18 16:04:10
这个页面对您有帮助吗?
有用
有用
无用
无用