You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
Paimon 实时数据湖开发
Paimon 部分列更新功能
复制全文
Paimon 部分列更新功能

1. 概述

部分列更新,主要是指直接更新表中某些字段值,而不是全部的字段值。可以采用 Update 语句来进行更新,这种 Update 语句一般采用先将整行数据读出,然后再更新部分字段值,再写回。这种语义在大量数据实时入湖场景非常低效。
Flink Paimon 的 Partial Update 是一种用于流式数据处理的表引擎功能,旨在通过多流并发更新同一张表的部分列,最终实现完整数据的更新。本文是关于这项功能其功能、特点及适用场景的详细介绍:

Image

部分列更新的功能包含如下方面:

  1. 部分列更新:Partial Update 允许多个 Flink 流任务并发更新同一张表的不同列,最终将这些部分更新合并为一行完整的数据。例如,流 A 更新列 A 和列 B,流 B 更新列 C 和列 D,最终生成一行包含所有列的数据。
  2. 多流并发写入:多个 Flink 流任务可以通过 UNION ALL 的方式合并为一个 Job,写入同一张 Paimon 表,避免因多 Job 并发写入导致的 Compaction 任务拆分问题。
  3. 无需全量 Join:传统的宽表构建通常需要通过双流或多流 Join 实现,而 Partial Update 避免了 Join 操作,减少了 Flink 任务的状态压力,简化了任务维护。
  4. 支持 Changelog 订阅:支持与 Lookup 或 Full-Compaction 的 Changelog Producer 结合使用,获取完整的数据变更日志。
  5. 支持序列组(Sequence Group):通过序列组机制,Partial Update 可以解决多流更新时的乱序问题。每个流可以定义自己的序列组,确保只有符合条件的更新才会生效。

2. 适用场景

以下是常见的部分列更新的适用场景:

  1. 多流实时动态更新:适用于需要频繁更新某些字段的场景,例如用户标签表中的行为信息更新,能够支持广告或推荐系统的实时分析和决策。
  2. 大宽表拼接:在将多张源表的数据合并成一张大宽表时,可以通过部分列更新来实现,避免全量 Join 操作带来的性能开销。例如,电商平台中需要将订单表、支付表、商品表等数据合并为一张宽表,供下游分析使用。
  3. 数据修正:适用于需要修正某些数据的场景,部分列更新能够精准定位并更新目标字段,有效减少更新开销。
  4. 高频并发写入:支持高频的并发写入,适用于需要实时更新大量行但仅涉及少数列的场景,例如实时日志处理或监控数据更新。
  5. 性能优化:在更新少数列时,部分列更新可以显著提升性能,尤其是在涉及大量行的情况下,能够减少不必要的计算和存储开销。

3. 使用示例

在使用之前,需要注意 Flink 版本要求:

  • Flink 版本需 >= 1.16,具体支持功能如下:
    • Flink 1.16 内置 Paimon 0.6 版本,不支持 collectmerge_map等聚合函数。如果需使用,建议可以上传 Paimon 高版本社区 Connecotr。
    • Flink 1.17 推荐使用。内置 Paimon 0.8.2 字节加强版功能,另支持如存储过程等语法。

3.1 业务说明

以下是一个使用 Partial Update 的示例,例如我们有构建用户主数据的公共维表逻辑。上游有三张用户相关的信息表:

  1. 用户基础信息:用户注册后的用户名、基本信息等内容,为用户表的主要信息
  2. 用户渠道信息:用户登录的渠道,比如 github / google account / phone ... 等,一个用户可能会绑定多个渠道登录方式
  3. 用户会员信息:用户注册后如果注册会员,则会有会员信息表,一个用户可能会有1个或者0个会员信息

而我们在做业务分层的时候,可能会需要构建以用户为中心的主数据,包含用户的各方面维度信息,以支持各项下游流、批任务的后续处理。

Image

此时上游的三张表的结构,经过简化如下(采用模拟数据源,实际上可能是 Kafka、MySQL-CDC、Paimon 等多种数据上游):

-- 用户基础数据
CREATE TABLE `paimon_pu`.`paimon_pu`.`user_info` (
    uid INT,
    username STRING, -- 用户名
    reg_time TIMESTAMP(3), -- 注册时间
    PRIMARY KEY (uid) NOT ENFORCED
);
-- 用户登录渠道
CREATE TABLE `paimon_pu`.`paimon_pu`.`user_logintype` (
    uid INT,
    logintype STRING, -- 渠道类型,如 github/google ...
    bind_time TIMESTAMP(3),
    PRIMARY KEY (uid, logintype) NOT ENFORCED
);
-- 用户会员信息
CREATE TABLE `paimon_pu`.`paimon_pu`.`user_vip_info` (
    uid INT,
    is_valid BOOLEAN, -- 会员是否有效
    start_time TIMESTAMP(3), -- 会员起始时间
    end_time TIMESTAMP(3), -- 会员结束时间
    PRIMARY KEY (uid) NOT ENFORCED
);

可以用如下的测试脚本,插入测试数据:

-- 插入用户基础数据
INSERT INTO `paimon_pu`.`paimon_pu`.`user_info` VALUES 
(1001, 'Alice', TIMESTAMP '2023-01-15 10:30:00'),
(1002, 'Bob', TIMESTAMP '2023-03-22 14:20:00'),
(1003, 'Charlie', TIMESTAMP '2023-05-18 09:15:00'),
(1004, 'David', TIMESTAMP '2023-07-01 16:45:00'),
(1005, 'Emma', TIMESTAMP '2023-09-30 11:25:00');

-- 插入用户登录渠道数据(一个用户可能有多个登录渠道)
INSERT INTO `paimon_pu`.`paimon_pu`.`user_logintype` VALUES 
(1001, 'github', TIMESTAMP '2023-01-15 10:35:00'),
(1001, 'google', TIMESTAMP '2023-01-16 15:20:00'),
(1002, 'wechat', TIMESTAMP '2023-03-22 14:25:00'),
(1003, 'github', TIMESTAMP '2023-05-18 09:20:00'),
(1003, 'google', TIMESTAMP '2023-05-19 10:30:00'),
(1004, 'wechat', TIMESTAMP '2023-07-01 16:50:00'),
(1005, 'github', TIMESTAMP '2023-09-30 11:30:00');

-- 插入用户会员信息
INSERT INTO `paimon_pu`.`paimon_pu`.`user_vip_info` VALUES 
(1001, true, TIMESTAMP '2023-02-01 00:00:00', TIMESTAMP '2023-12-31 23:59:59'),
(1002, true, TIMESTAMP '2023-04-01 00:00:00', TIMESTAMP '2023-10-31 23:59:59'),
(1003, true, TIMESTAMP '2023-06-01 00:00:00', TIMESTAMP '2024-05-31 23:59:59'),
(1005, true, TIMESTAMP '2023-10-01 00:00:00', TIMESTAMP '2024-09-30 23:59:59');

3.2 业务建模

我们根据业务描述,希望按照如下模式进行业务建模:

-- 用户宽表,包含以上三个表的所有数据作为公共维度表
CREATE TABLE IF NOT EXISTS `paimon_pu`.`paimon_pu`.`ods_user_wide_info` (
    uid INT,
    -- 用户基础信息
    username STRING, -- 用户名
    reg_time TIMESTAMP(3), -- 注册时间
    -- 用户渠道信息,嵌套结构保存多种登录渠道,以渠道名去重
    logintypes ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>,
    last_bind_time TIMESTAMP(3), 
    -- 会员信息
    vip_is_valid BOOLEAN, -- 会员是否有效
    vip_start_time TIMESTAMP(3), -- 会员起始时间
    vip_end_time TIMESTAMP(3), -- 会员结束时间
    PRIMARY KEY (uid) NOT ENFORCED
) WITH (
    'merge-engine'='partial-update',
    -- 如果需要下游的流读,需要以 lookup 或者 full-compaction 
    'changelog-producer' = 'lookup', 
    'fields.last_bind_time.sequence-group' = 'logintypes',
    'fields.logintypes.aggregate-function' = 'nested_update',
    'fields.logintypes.nested-key' = 'logintype'
);

关键设计:

  1. merge-engine:采用 partial-update
  2. sequence-group:采用 last_login_time 作为序列组字段,如果这个时间戳版本增加的话,会对 logintypes字段进行聚合
  3. aggregate-function:聚合函数采用 nested_update在子字段中,对 logintype作为嵌套主键,进行去重更新

这样就可以达到当用户绑定了多种渠道之后,会将用户的绑定渠道作为数组置于嵌套字段 logintypes内。

3.3 部分列更新

以下 Flink SQL 从三个数据源各自读取部分列,将其他的不修改的数据置为 NULL,这样可以保证每个数据流都只更新部分数据,最后将三个数据流 UNION ALL 起来,变成一个数据流写入下游 Partial Update 表中:

INSERT INTO `paimon_pu`.`paimon_pu`.`ods_user_wide_info`
SELECT
  uid, username, reg_time, logintypes, last_bind_time, vip_is_valid, vip_start_time, vip_end_time
FROM (
    SELECT 
      uid, 
      username,
      reg_time, 
      CAST (NULL AS ARRAY<ROW<logintype STRING, bind_time TIMESTAMP(3)>>) AS logintypes,
      CAST (NULL AS TIMESTAMP(3)) AS last_bind_time,
      CAST (NULL AS BOOLEAN) AS vip_is_valid,
      CAST (NULL AS TIMESTAMP(3)) AS vip_start_time,
      CAST (NULL AS TIMESTAMP(3)) AS vip_end_time
    FROM `user_info`
    
    UNION ALL 
    
    SELECT 
      uid, 
      CAST (NULL AS STRING) AS username,
      CAST (NULL AS TIMESTAMP(3)) AS reg_time,
      ARRAY[ROW(logintype, bind_time)] AS logintypes, 
      bind_time AS last_bind_time,
      CAST (NULL AS BOOLEAN) AS vip_is_valid,
      CAST (NULL AS TIMESTAMP(3)) AS vip_start_time,
      CAST (NULL AS TIMESTAMP(3)) AS vip_end_time
    FROM `user_logintype`
    
    UNION ALL 
      
    SELECT 
      uid, 
      CAST (NULL AS STRING) AS username,
      CAST (NULL AS TIMESTAMP(3)) AS reg_time,
      CAST (NULL AS ARRAY<ROW<logintype VARCHAR, bind_time TIMESTAMP(3)>>) AS logintypes,
      CAST (NULL AS TIMESTAMP(3)) AS last_bind_time,
      is_valid AS vip_is_valid,
      start_time AS vip_start_time,
      end_time AS vip_end_time
    FROM `user_vip_info`
);

3.4 结果验证

可以使用如下的类似 SQL 对数据进行探索查询,需要注意的是此类 SQL 不能直接在 Flink SQL 任务中调试运行,需要写相关的 Insert 语句到 print 的数据下游才能正常调试。

-- 用户宽表,包含以上三个表的所有数据作为公共维度表
CREATE TABLE IF NOT EXISTS `print_result` (
    uid INT,
    -- 用户基础信息
    username STRING, -- 用户名
    reg_time TIMESTAMP(3), -- 注册时间
    -- 用户渠道信息,嵌套结构保存多种登录渠道,以渠道名去重
    logintype STRING,
    bind_time TIMESTAMP(3),
    -- 会员信息
    vip_is_valid BOOLEAN, -- 会员是否有效
    vip_start_time TIMESTAMP(3), -- 会员起始时间
    vip_end_time TIMESTAMP(3), -- 会员结束时间
    PRIMARY KEY (uid) NOT ENFORCED
) WITH (
    'connector'='print'
);

-- 使用 UNNEST 将数组解嵌套
-- 注意:当前 Flink SQL 不支持直接运行此类查询 SQL,需要 insert 到 print sink 中
INSERT INTO `print_result` 
SELECT uid, username, reg_time, logintype, bind_time, vip_is_valid,  vip_start_time, vip_end_time
FROM `paimon_pu`.`paimon_pu`.`ods_user_wide_info`, UNNEST(logintypes) AS lt(logintype, bind_time);

查询结果可以看到批作业处理成功,输出内容如下:
Image

4. 功能详解

4.1 功能用法

通过指定 'merge-engine' = 'partial-update',用户可以通过多次更新来逐步完成记录的列更新。这是通过使用相同主键下的最新数据逐个更新值字段来实现的。然而,空值在这个过程中不会被覆盖。例如,假设 Paimon 接收到三条记录:

  • <1, 23.0, 10, NULL>
  • <1, NULL, NULL, 'This is a book'>
  • <1, 25.2, NULL, NULL>

假设第一列是主键,最终结果将是 <1, 25.2, 10, 'This is a book'>

对于流查询,partial-update 合并引擎必须与 lookupfull-compaction (参考 Changelog 产出机制)一起使用。值得注意的是,input 也支持产出 Changelog,但只返回输入记录,不能返回更新后的数据。

另外,默认情况下,部分更新不能接受删除记录,你可以选择以下解决方案之一:

  • 配置 'ignore-delete' 来忽略删除记录。
  • 配置 'sequence-group' 来撤销部分列。

4.2 序列组(Sequence-Group)

序列字段可能无法解决具有多个流更新的部分更新表的无序问题,因为在多流更新期间,序列字段可能会被另一个流的最新数据覆盖。因此,我们为部分更新表引入了序列组机制。它可以解决:

  1. 多流更新期间的无序问题。每个流定义其自己的序列组。
  2. 真正的部分更新,而不仅仅是非空更新,比如需要支持 Null 值覆盖的场景。

请参见示例:

CREATE TABLE t (
    k INT,
    a INT,
    b INT,
    g_1 INT,
    c INT,
    d INT,
    g_2 INT,
    PRIMARY KEY (k) NOT ENFORCED
) WITH (
    'merge-engine'='partial-update',
    'fields.g_1.sequence-group'='a,b',
    'fields.g_2.sequence-group'='c,d'
);

INSERT INTO t VALUES (1, 1, 1, 1, 1, 1, 1);

-- g_2 为空,c, d 不应更新
INSERT INTO t VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT));

SELECT * FROM t; -- 输出 1, 2, 2, 2, 1, 1, 1

-- g_1 较小,a, b 不应更新
INSERT INTO t VALUES (1, 3, 3, 1, 3, 3, 3);

-- 输出 1, 2, 2, 2, 3, 3, 3

对于 fields.<field-name>.sequence-group,有效的比较数据类型包括:DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP 和 TIMESTAMP_LTZ。

4.3 部分列更新支持聚合

你可以为输入字段指定聚合函数,所有在 Paimon 聚合表功能 中支持的函数都可以使用。请参见示例:

CREATE TABLE t (
          k INT,
          a INT,
          b INT,
          c INT,
          d INT,
          PRIMARY KEY (k) NOT ENFORCED
) WITH (
     'merge-engine'='partial-update',
     'fields.a.sequence-group' = 'b',
     'fields.b.aggregate-function' = 'first_value',
     'fields.c.sequence-group' = 'd',
     'fields.d.aggregate-function' = 'sum'
 );
 
-- 输入 1,1,1,null,null --> b 字段 first_value 更新为 1
INSERT INTO t VALUES (1, 1, 1, CAST(NULL AS INT), CAST(NULL AS INT));
-- 输入 1,null,null,1,1 --> d 字段聚合值为 1
INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 1, 1);
-- 输入 1,2,2,null,null --> b 字段 first_value 不更新
INSERT INTO t VALUES (1, 2, 2, CAST(NULL AS INT), CAST(NULL AS INT));
-- 输入 1,null,null,2,2 --> d 字段聚合值为 3
INSERT INTO t VALUES (1, CAST(NULL AS INT), CAST(NULL AS INT), 2, 2);

-- 表最终结果输出 1, 2, 1, 2, 3

Flink Paimon 的 Partial Update 是一种高效的多流并发更新机制,适用于宽表构建、实时数据打宽、流批一体等场景。其核心优势在于避免了全量 Join 操作,减少了状态压力,同时通过序列组机制解决了多流更新的乱序问题。对于需要高效更新和打宽数据的业务场景,可以尝试使用 Partial Update 功能。另外,更多功能也请关注 Paimon 官方文档

5. 常见问题

5.1 部分列更新表不支持流读

问题现象:在流式任务消费上游的部分列更新表的时候报错如下,java.lang.RuntimeException: Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. ('input' changelog producer is also supported, but only returns input records.)
Image

解决方案:部分列更新表如果没有产出 changelog,是无法进行流式读取的。建议使用 lookup或者full-compaction两种模式进行变更日志订阅。另外如果使用input的产出模式的话,一般是无法获得真正的变更日志,这种情况下获得的是上游的插入原始内容。

最近更新时间:2025.03.03 09:41:51
这个页面对您有帮助吗?
有用
有用
无用
无用