Apache Paimon 是一个基于流和批处理的实时数据湖解决方案,结合了高效的存储和灵活的计算,专为处理大规模实时数据和流式数据而设计。Paimon 为 Flink 引擎提供了完善的 Catalog 接口支持,可以通过 Catalog 方便的管理实时数据湖元数据。
在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶、创建文件夹。
注意:请确保 Flink 和 TOS 处在同一个 Region,Flink 当前暂不支持跨 Region 访问 TOS Bucket。
选择数据目录 - Catalog 列表 - 创建 Catalog - 选择 Paimon Catalog:
填写 Catalog 名称和 Catalog 的 TOS 地址
在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶、创建文件夹。
除此之外需要
并且在 LAS Catalog 权限管理模块,选择对于 LAS Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
满足前置的条件之后,可以在数据目录功能选择,Paimon 和 LAS 的 Catalog 选项,并且填写以上参数内容:
点击确定,即可创建成功。
参数名称 | 参数说明 |
|---|---|
Catalog 类型 | 当前支持 Filesystem 和 LAS 两种数据目录类型。 |
Catalog 名称 | 在 Flink 数据目录中的名称,创建好之后就可以在 Flink SQL 和临时查询任务中以这个名字进行 Catalog 引用
|
Warehouse 路径 | Paimon 在 TOS 上存储的根目录。请确保和 LAS Catalog 中设置的 Catalog TOS 路径保持一致。 |
AccessKeyID | 子账号创建 AccessKeyID,需要确保有 LASFullAccess 权限和相关数据库表权限。 |
AccessKeySecret | 子账号创建 AccessKeySecret,需要确保有 LASFullAccess 权限和相关数据库表权限。 |
LAS 中 Catalog 名称 | 在 LAS Catalog 需要绑定的 Catalog 对应名称。 |
在数据目录 - Catalog 列表 - 选择 Catalog - 选择同步 Catalog 元数据功能
可以查看 Catalog 下所有的 Paimon 库、表的内容:
在 Catalog 中创建一个 Database,用于组织和管理表。
CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
${catalog_name}:Catalog 的名称,和在上文中创建的 Paimon Catalog 保持一致。${db_name}:Database 的名称,自定义。在 Database 中创建表,定义表结构和相关配置。
CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` ( word varchar, -- 示例字段 cnt bigint, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'input' -- 产生 changelog,用于下游流读 );
${table_name}:表的名称,自定义。bucket:分桶数量,推荐单个 bucket 存储 1GB 左右数据。changelog-producer:
input,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none选项以节省存储和写入资源。Paimon 查询 SQL 同时支持流读、批读,只要在 Flink 运行过程中选择对应的模式即可:
INSERT INTO `print_table` SELECT * FROM `paimon_test`.`default`.`doc_result`;
另外可以通过临时查询功能,使用 Flink SQL 进行数据探索:
Flink Insert 语句支持流写、批写两种语义,只要在运行过程中选择相应的执行模式即可:
INSERT INTO `paimon_test`.`default`.`doc_result` select t.word, count(1) from doc_source t GROUP BY t.word;
除了 Insert 语句之外,Paimon 也支持对数据表、分区等进行批式覆盖写:
-- 覆盖写入非分区表 INSERT OVERWRITE my_table SELECT ... -- 覆盖写入分区表 INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...