You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Iceberg 数据库湖开发
Iceberg 使用 LAS Catalog 管理元数据
复制全文
下载 pdf
Iceberg 使用 LAS Catalog 管理元数据

1. 概述

本手册将指导您如何使用 Flink 引擎进行 Iceberg 的开发任务,并且利用 LAS Catalog 统一管理 Iceberg 的元数据。

2. 环境准备

2.1 前置条件

前置条件

详细说明

开通流式计算 Flink 版产品

[必选] 需开通该产品,并能在作业开发中创建 Flink SQL 任务

购买资源池

[必选] 需在资源管理 - 资源池功能模块购买按量或包年包月的资源池,以正常提交 Flink 任务

开通 LAS Catalog 服务

[必选] 需开通 LAS Catalog 统一元数据管理服务,可参考 LAS Catalog 开通文档
[必选] 权限管理中为子账号开通 LASFullAccess 权限。

关于 EMR Serverless Spark 的开通

[必选] 若需创建隐藏分区表,需开通 EMR Serverless Spark;
[可选] 若仅创建普通表(包括分区表和非分区表),则无需开通,直接使用 Flink SQL 建表即可

需要生成访问密钥

[必选] 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 hive-site.xml 中)。

3. 创建 Catalog

3.1 LAS 数据目录创建和授权

在 LAS Catalog 产品中创建 Iceberg 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。

注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。

Image

并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
Image

3.2 数据目录注册 LAS Catalog

满足前置的条件之后,可以在数据目录功能选择,Iceberg 和 LAS 的 Catalog 选项,并且填写 Catalog 名称、Warehouse 路径、AK/SK、LAS Catalog 名称等:
Image

点击确定,即可创建成功。

4. 创建 Database 和 Table

目前有两种方式创建 Database 和 Table:

  • 方式一:基于 Flink SQL Catalog 建表(推荐的方式)。
  • 方式二:基于 EMR Serverless SparkSQL 建表,在账号已开通 Spark 的情况下,可以通过 Spark 创建 LAS Iceberg 表。另外如果是创建隐藏分区表,必须要使用 SparkSQL 建表,这是由于 Flink SQL 不支持隐藏分区表的语法。

可以在临时查询中输入如下 DDL 进行建表:

CREATE DATABASE IF NOT EXISTS iceberg_test.`default`;

-- 创建 Iceberg 表(支持分区、排序等特性)
CREATE TABLE IF NOT EXISTS iceberg_test.`default`.test_table (
    word STRING, 
    number BIGINT,
    -- 可选,主键配置,会自动根据主键 ID 进行 upsert 模式去重
    PRIMARY KEY (`word`) NOT ENFORCED
) WITH
  (
    'format-version' = '2', -- Iceberg 表格式版本
    'write.format.default' = 'parquet'
  );

创建 LAS Iceberg 库表过程如下:
Image

4.2 方式二:通过 EMR Serverless SparkSQL 创建 LAS Iceberg 库表

  1. EMR 界面开通 Serverless Spark,必须保证有一个可用的计算队列
  2. 运行以下命令创建 Iceberg 表
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

-- 使用 iceberg catalog
use iceberg;

-- 使用三段式定义 las 表,如下所示,iceberg_test 是 las 数据目录,default 是数据库,dwd_event_fact 是数据表
CREATE TABLE iceberg_test.default.dwd_event_fact(id STRING, number INT) USING iceberg;

创建隐藏分区表:

set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

-- 使用 iceberg catalog
use iceberg;

-- 使用三段式定义 las 表,如下所示,iceberg_test 是 las 数据目录,default 是数据库,dwd_event_fact_hide_part 是数据表
CREATE TABLE iceberg_test.default.dwd_event_fact_hide_part (
    user_id BIGINT,action STRING,
    event_time TIMESTAMP,  -- 用于计算分区的原始时间字段
    product_id INT)
USING iceberg
-- 基于 event_time 动态计算日的隐藏分区
PARTITIONED BY (day(event_time));

5. 数据流式写入示例

以下示例展示了如何使用 Flink SQL 将数据写入 Iceberg 表。首先创建 Flink SQL 作业。
操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档开发 Flink SQL 任务

Image

5.1 创建数据源表

首先,Flink SQL 任务重创建一个数据源表,用于生成模拟数据。

CREATE TABLE doc_source (word varchar)
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '30'
  );
  • connector:使用 datagen 连接器生成模拟数据。
  • rows-per-second:每秒生成的行数。
  • fields.word.length:生成字段 word 的长度。

5.2 写入数据到 Iceberg 表

将数据源表中的数据写入 Iceberg 表。

INSERT INTO `iceberg_test`.`default`.`doc_result`
select
  t.word,
  count(1)
from
  doc_source t
GROUP BY
  t.word;
  • iceberg_test:Catalog 名称。
  • default:Database 名称。
  • doc_result:目标表名称。

5.3 开启 Checkpoint

Iceberg 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。

注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。

Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
Image

6. 数据批式写入示例

6.1 作业类型指定批模式

参考第 5 节的内容,此时采用批模式写入,需要在创建作业时指定批模式。
Image

6.2 写入批式数据到 Iceberg 表

以写入一条数据举例如下:

INSERT INTO `iceberg_test`.`default`.`doc_result`
SELECT 'Tom', 25;
  • iceberg_test:Catalog 名称。
  • default:Database 名称。
  • doc_result:目标表名称。

此外,也可以批读数据,再批写回去。

7. 上线任务

7.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
Image
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

7.2 确认任务执行成功

可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。

Image

8. 数据读取示例

我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Iceberg 表中流/批式读取数据。进一步可以确认数据准确性。

8.1 创建打印表

创建一个打印表,用于输出读取的数据。

CREATE TABLE `print_table` (
    word varchar,
    cnt bigint
) WITH (
    'connector' = 'print'
);
  • connector:使用 print 连接器将数据打印到控制台。

启动一个 Flink SQL 批任务,从 Iceberg 表中读取数据并写入打印表。

INSERT INTO `print_table`
SELECT * FROM `iceberg_test`.`default`.`dwd_event_fact`;
  • iceberg_test:Catalog 名称。
  • default:Database 名称。
  • dwd_event_fact:源表名称。

8.3 使用 LAS Catalog 查看元数据

参考查看数据表,进行数据表的元数据查询。

Image

8.4 使用 EMR Serverless Spark 查询 Iceberg 数据

参考 4.1 节,进入 hive 交互命令查询数据

set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

use iceberg;

select * from iceberg_test.default.dwd_event_fact limit 10;

验证数据正常写入

9. 常见问题

9.1 验证 SQL 时报错

如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException此类错误,说明当前连接 LAS 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 LAS 元数据。

org.apache.flink.table.api.ValidationException: Unable to create catalog 'Iceberg_test1'.

Catalog options are:
'hive-conf-dir'='/opt/tiger/workdir'
'metastore'='hive'
'type'='iceberg'
'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869'
'warehouse'='tos://flink-cwz-iceberg/iceberg_test1'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511)
        ...
Caused by: java.lang.RuntimeException: Failed to determine if database default exists
        at org.apache.iceberg.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223)
        ... 9 more
Caused by: org.apache.thrift.transport.TTransportException
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
        ... 15 more

解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
Image

9.2 运行时访问 LAS 接口失败

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口无法访问

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException

Image
可能原因

  1. 没有上传 hive-site.xml 文件,或者文件名不正确
    1. 解决方法:检查 hive-site.xml 是否成功上传到依赖文件中,并且文件名必须完全符合要求。
  2. 访问 LAS 的 AK/SK 不正确,无法正确认证用户信息。
    1. 解决方法:检查 hive-site.xml 中 AccessKey 和 AccessKeySecret 是否正确。
  3. 访问 LAS 的用户没有 IAM 的 LASFullAccess 权限,请联系管理者开通该权限
    1. 解决方法:联系主账号管理者,检查是否为用户开通 LASFullAccess 权限。

9.3 访问 LAS 报接口无权限

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口没有成功授权

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Access denied: [DeniedPrivilege(resource:Resource{resourceScope='SCHEMA', catalogName='iceberg_test1', schemaName='test_db'}, action:DESCRIBE)] for user: 31035840

Image
解决方法:这个问题是因为在 LAS Catalog 中没有给指定账号赋予相关权限。请结合报错日志信息提示的 action,参考数据目录管理,为账号开通权限即可。

9.4 任务无法启动,报 LAS 数据库不存在

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
Image
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 LAS 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:

CREATE TABLE IF NOT EXISTS test_db;

重新提交任务之后,就可以恢复正常。

9.5 hive-site.xml 格式不正确

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException

Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs.
 at [row,col {unknown-source}]: [2,5]

Image
解决办法:这个原因是因为对接 LAS 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:

  1. 文件开头的内容必须是 <?xml ...,在尖括号前方不能包含任何不可见字符、空格、空行等。
  2. xml 文档内必须包含合法的标签,比如在内容中不能出现 <>&等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。
最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用