You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
数据目录(Beta)
Iceberg Catalog
复制全文
Iceberg Catalog

Apache Iceberg 是一种开源表格式,提供 ACID 事务、模式演化和分区管理等特性,适用于构建可靠的数据湖。通过将 Flink 与 Iceberg 集成,用户可以实现流式数据实时写入 Iceberg 表,支持数据湖的实时更新和查询。

1. 使用限制

  • 数据目录服务当前仅支持 Iceberg 基于 LAS 元数据,以及 TOS 底层存储,如果当前需要使用 Hadoop / Rest + S3 等其他 Catalog 方式访问,请参考 Iceberg 使用 Rest Catalog 管理元数据 的开发参考。
  • Iceberg 数据目录仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。

2. 使用步骤

2.1 创建 Catalog

在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶创建文件夹
除此之外需要

  1. 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret 。
  2. 在权限管理中为子账号开通 LASFullAccess 权限。

并且在 LAS Catalog 权限管理模块,选择对于 LAS Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
Image

满足前置的条件之后,可以在数据目录功能选择,Iceberg 和 LAS 的 Catalog 选项,并且填写以上参数内容:
Image

点击确定,即可创建成功。

2.1.3 参数说明

参数名称

参数说明

Catalog 类型

当前支持 LAS 两种数据目录类型。

Catalog 名称

在 Flink 数据目录中的名称,创建好之后就可以在 Flink SQL 和临时查询任务中以这个名字进行 Catalog 引用

注意仅支持字母、数字、下划线,且不能以数字开头。

Warehouse 路径

Iceberg 在 TOS 上存储的根目录。请确保和 LAS Catalog 中设置的 Catalog TOS 路径保持一致。

注意:当前数据目录仅支持 tos:// 的方式引用 TOS 路径。

AccessKeyID

子账号创建 AccessKeyID,需要确保有 LASFullAccess 权限和相关数据库表权限。

AccessKeySecret

子账号创建 AccessKeySecret,需要确保有 LASFullAccess 权限和相关数据库表权限。

LAS 中 Catalog 名称

在 LAS Catalog 需要绑定的 Catalog 对应名称。

2.2 查看 Iceberg Catalog 内容

在数据目录 - Catalog 列表 - 选择 Catalog - 选择同步 Catalog 元数据功能
Image
可以查看 Catalog 下所有的 Iceberg 库、表的内容:
Image

2.3 Iceberg Catalog 读写

2.3.1 创建数据库

在 Catalog 中创建一个 Database,用于组织和管理表。

CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
  • ${catalog_name}:Catalog 的名称,和在上文中创建的 Iceberg Catalog 保持一致。
  • ${db_name}:Database 的名称,自定义。

2.3.2 创建数据表

在 Database 中创建表,定义表结构和相关配置。

CREATE TABLE if not EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
  `organization_id` BIGINT,
  `advertiser_id` BIGINT,
  `data` STRING,
  `type` STRING,
  `date` STRING,
  `timestamp` BIGINT,
  `dt` STRING,
  PRIMARY KEY (`dt`, `advertiser_id`) not enforced -- 支持主键进行去重
) PARTITIONED BY (`dt`)  -- 分区设置,Flink 不支持 hidden 分区
WITH
  (
    'format-version' = '2', -- Iceberg 表格式版本
    'write.format.default' = 'parquet' -- 默认文件格式
  );

2.3.3 查询 Iceberg 表

Iceberg 查询 SQL 同时支持流读、批读,需要在 Flink 运行过程中选择对应的模式即可:

-- 流式消费场景
INSERT INTO `print_result`
select
  *
from
  `${catalog_name}`.`${db_name}`.`${table_name}` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s') */; -- 使用 hint 来设置读取模式

另外可以通过临时查询功能,使用 Flink SQL 进行数据探索:
Image

2.3.4 流批读写

Flink Insert 语句支持流写、批写两种语义,只要在运行过程中选择相应的执行模式即可:

INSERT INTO `${catalog_name}`.`${db_name}`.`${table_name}`
select
  *
from
  kafka_source t;

2.3.5 覆盖写

除了 Insert 语句之外,Iceberg 也支持对数据表、分区等进行批式覆盖写:

-- 覆盖写入非分区表
INSERT OVERWRITE my_table SELECT ...

-- 覆盖写入分区表
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用