You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
数据目录(Beta)
Iceberg Catalog
复制全文
下载 pdf
Iceberg Catalog

Apache Iceberg 是一种开源表格式,提供 ACID 事务、模式演化和分区管理等特性,适用于构建可靠的数据湖。通过将 Flink 与 Iceberg 集成,用户可以实现流式数据实时写入 Iceberg 表,支持数据湖的实时更新和查询。

1. 使用限制

  • 数据目录服务当前仅支持 Iceberg 基于 LAS 元数据,以及 TOS 底层存储,如果当前需要使用 Hadoop / Rest + S3 等其他 Catalog 方式访问,请参考 Iceberg 使用 Rest Catalog 管理元数据 的开发参考。
  • Iceberg 数据目录仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。

2. 使用步骤

2.1 创建 Catalog

在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶创建文件夹
除此之外需要

  1. 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret 。
  2. 在权限管理中为子账号开通 LASFullAccess 权限。

并且在 LAS Catalog 权限管理模块,选择对于 LAS Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
Image

满足前置的条件之后,可以在数据目录功能选择,Iceberg 和 LAS 的 Catalog 选项,并且填写以上参数内容:
Image

点击确定,即可创建成功。

2.1.3 参数说明

参数名称

参数说明

Catalog 类型

当前支持 LAS 两种数据目录类型。

Catalog 名称

在 Flink 数据目录中的名称,创建好之后就可以在 Flink SQL 和临时查询任务中以这个名字进行 Catalog 引用

注意仅支持字母、数字、下划线,且不能以数字开头。

Warehouse 路径

Iceberg 在 TOS 上存储的根目录。请确保和 LAS Catalog 中设置的 Catalog TOS 路径保持一致。

注意:当前数据目录仅支持 tos:// 的方式引用 TOS 路径。

AccessKeyID

子账号创建 AccessKeyID,需要确保有 LASFullAccess 权限和相关数据库表权限。

AccessKeySecret

子账号创建 AccessKeySecret,需要确保有 LASFullAccess 权限和相关数据库表权限。

LAS 中 Catalog 名称

在 LAS Catalog 需要绑定的 Catalog 对应名称。

2.2 查看 Iceberg Catalog 内容

在数据目录 - Catalog 列表 - 选择 Catalog - 选择同步 Catalog 元数据功能
Image
可以查看 Catalog 下所有的 Iceberg 库、表的内容:
Image

2.3 Iceberg Catalog 读写

2.3.1 创建数据库

在 Catalog 中创建一个 Database,用于组织和管理表。

CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
  • ${catalog_name}:Catalog 的名称,和在上文中创建的 Iceberg Catalog 保持一致。
  • ${db_name}:Database 的名称,自定义。

2.3.2 创建数据表

在 Database 中创建表,定义表结构和相关配置。

CREATE TABLE if not EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
  `organization_id` BIGINT,
  `advertiser_id` BIGINT,
  `data` STRING,
  `type` STRING,
  `date` STRING,
  `timestamp` BIGINT,
  `dt` STRING,
  PRIMARY KEY (`dt`, `advertiser_id`) not enforced -- 支持主键进行去重
) PARTITIONED BY (`dt`)  -- 分区设置,Flink 不支持 hidden 分区
WITH
  (
    'format-version' = '2', -- Iceberg 表格式版本
    'write.format.default' = 'parquet' -- 默认文件格式
  );

2.3.3 查询 Iceberg 表

Iceberg 查询 SQL 同时支持流读、批读,需要在 Flink 运行过程中选择对应的模式即可:

-- 流式消费场景
INSERT INTO `print_result`
select
  *
from
  `${catalog_name}`.`${db_name}`.`${table_name}` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s') */; -- 使用 hint 来设置读取模式

另外可以通过临时查询功能,使用 Flink SQL 进行数据探索:
Image

2.3.4 流批读写

Flink Insert 语句支持流写、批写两种语义,只要在运行过程中选择相应的执行模式即可:

INSERT INTO `${catalog_name}`.`${db_name}`.`${table_name}`
select
  *
from
  kafka_source t;

2.3.5 覆盖写

除了 Insert 语句之外,Iceberg 也支持对数据表、分区等进行批式覆盖写:

-- 覆盖写入非分区表
INSERT OVERWRITE my_table SELECT ...

-- 覆盖写入分区表
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用