You need to enable JavaScript to run this app.
导航
Iceberg 使用 LAS Catalog 管理元数据
最近更新时间:2025.08.25 00:35:53首次发布时间:2025.08.25 00:35:53
复制全文
我的收藏
有用
有用
无用
无用

1. 概述

本手册将指导您如何使用 Flink 引擎进行 Iceberg 的开发任务,并且利用 LAS Catalog 统一管理 Iceberg 的元数据。

2. 环境准备

2.1 前置条件

前置条件

详细说明

开通流式计算 Flink 版产品

[必选] 需开通该产品,并能在作业开发中创建 Flink SQL 任务

购买资源池

[必选] 需在资源管理 - 资源池功能模块购买按量或包年包月的资源池,以正常提交 Flink 任务

开通 LAS Catalog 服务

[必选] 需开通 LAS Catalog 统一元数据管理服务,可参考 LAS Catalog 开通文档
[必选] 权限管理中为子账号开通 LASFullAccess 权限。

关于 EMR Serverless Spark 的开通

[必选] 若需创建隐藏分区表,需开通 EMR Serverless Spark;
[可选] 若仅创建普通表(包括分区表和非分区表),则无需开通,直接使用 Flink SQL 建表即可

需要生成访问密钥

[必选] 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 hive-site.xml 中)。

  • 当前 Flink 版本需要选择 1.16

3. 创建 Catalog

3.1 LAS 数据目录创建和授权

在 LAS Catalog 产品中创建 Iceberg 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。

注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。

Image

并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
Image

操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档开发 Flink SQL 任务

Image

3.2 任务上传依赖文件

  1. 操作路径:控制台 - 进入项目 - 作业开发 - 选择作业 - 参数配置(右侧按钮) - 依赖文件。
  2. 准备依赖文件
    1. 下载 LAS Catalog Connector。
flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar
未知大小
  1. 下载 hive-site.xml 文件模板(文件名必须保证一模一样),修改相关访问和认证信息。
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <!--
  `metastore.catalog.default` 是写入 Hive Catalog 的名称,如果不填默认是 hive 
  -->
  <property>
    <name>metastore.catalog.default</name>
    <value>__LAS_DEFAULT_CATALOG__</value>
  </property>
  <property>
    <name>hive.server2.max.start.attempts</name>
    <value>5</value>
  </property>
  
  <!--
  `hive.client.las.region.name` 是 LAS 和 Flink 服务所在 Region
  `hive.metastore.uris` 是 LAS 的 thrift 接口地址,需要和所在 Region 保持一致
  -->
  <property>
    <name>hive.client.las.region.name</name>
    <value>__LAS_REGION__</value>
    <!-- 华北 <value>cn-beijing</value> -->
    <!-- 华东 <value>cn-shanghai</value> -->
    <!-- 华南 <value>cn-guangzhou</value> -->
    <!-- 柔佛 <value>ap-southeast-1</value> -->
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>__LAS_METASTORE_URL__</value>
    <!-- 华北 <value>thrift://lakeformation.las.cn-beijing.ivolces.com:48869</value> -->
    <!-- 华东 <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value> -->
    <!-- 华南 <value>thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869</value> -->
    <!-- 柔佛 <value>thrift://lakeformation.las.ap-southeast-1.ivolces.com:48869</value> -->
  </property>

  <property>
    <name>hive.hms.client.is.public.cloud</name>
    <value>true</value>
  </property>

  <!--
  `hive.client.las.ak/sk` 是 LAS 服务访问的 AccessKeyId 和 AccessKeySecret
  填写前请确认子账号拥有 LAS 的 IAM 访问权限,和 Catalog 的数据权限
  -->
  <property>
    <name>hive.client.las.ak</name>
    <value>__LAS_ACCESS_KEY__</value>
  </property>
  <property>
    <name>hive.client.las.sk</name>
    <value>__LAS_ACCESS_KEY_SECRET__</value>
  </property>
</configuration>
  1. 上传依赖文件:如下图,执行上传文件资源的操作:

Image

因为 LAS Catalog 提供兼容 Hive 的元数据 thrift 接口。所以在 Flink 中使用 LAS Catalog 的方式和基于 Hive 的 Catalog 的方式非常相似。我们可以通过如下 SQL 创建 LAS Catalog:

CREATE CATALOG ${catalog_name} WITH (
    'type'='iceberg',
    'catalog-type'='hive',
    'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'clients'='5',
    'property-version'='1',
    'hive-conf-dir' = '/opt/tiger/workdir/',
    'warehouse' = 'tos://${bucket_name}/${catalog_name}'
);

其中两个变量需要按照如下:

  • ${catalog_name}:Catalog 的名称,和 LAS 中的 Catalog 名称需要保持一致。
  • ${bucket_name}:存储 Iceberg 数据的 TOS(对象存储)桶名称。

WITH 参数的意义如下:

  • type:选择 iceberg 类型 Catalog。
  • metastore:选择 hive 的方式进行元数据管理。
  • uri:LAS Catalog 的元数据 thrift 接口,可以参考示例中的地址,将 __LAS_REGION__ 改成和 Flink 服务相同的区域即可。
  • hive-conf-dir:这里指定 hive-site.xml 的文件路径。注意:按照本篇文档内容测试方案,此处需要固定填写 /opt/tiger/workdir
  • warehouse:和 LAS Catalog 中的数据目录存储位置保持一致。

4. 创建 Database 和 Table

目前有两种方式创建 Database 和 Table:

  • 方式一:基于 Flink SQL Catalog 建表(推荐的方式)。
  • 方式二:基于 EMR Serverless SparkSQL 建表,在账号已开通 Spark 的情况下,可以通过 Spark 创建 LAS Iceberg 表。另外如果是创建隐藏分区表,必须要使用 SparkSQL 建表,这是由于 Flink SQL 不支持隐藏分区表的语法。

  1. 创建一个流式 Flink SQL 作业,在 SQL 中创建 Flink iceberg catalog,并建表
-- 创建 Iceberg on LAS Catalog
CREATE CATALOG lf_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
  'clients'='5',
  'property-version'='1',
  'hive-conf-dir' = '/opt/tiger/workdir/', -- 加载 hive-site.xml 配置
  'warehouse'='tos://tos-bucket-test/'
);

CREATE DATABASE IF NOT EXISTS lf_catalog.`default`;

-- 创建 Iceberg 表(支持分区、排序等特性)
CREATE TABLE IF NOT EXISTS lf_catalog.`default`.test_table (
    word STRING, 
    number BIGINT
) WITH
  (
    'format-version' = '2', -- Iceberg 表格式版本
    'write.format.default' = 'parquet', -- 默认文件格式
    'location' = 'tos://flink-cwz-paimon/cwz_iceberg/default.db/test_table' -- 表存储路径
  );
  1. 上线(选择跳过深度检查),并执行 Flink SQL 作业,作业执行结束后,在 LAS 平台上检查 LAS 表是否创建。

注意:由于建表并没有实际的 DML 操作,因此作业执行会报错,但是不影响建表的功能。
Image

4.2 方式二:通过 EMR Serverless SparkSQL 创建 LAS Iceberg 库表

  1. EMR 界面开通 Serverless Spark,必须保证有一个可用的计算队列
  2. 运行以下命令创建 Iceberg 表
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

-- 使用 iceberg catalog
use iceberg;

-- 使用三段式定义 las 表,如下所示,iceberg_test 是 las 数据目录,default 是数据库,dwd_event_fact 是数据表
CREATE TABLE iceberg_test.default.dwd_event_fact(id STRING, number INT) USING iceberg;

创建隐藏分区表:

set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

-- 使用 iceberg catalog
use iceberg;

-- 使用三段式定义 las 表,如下所示,iceberg_test 是 las 数据目录,default 是数据库,dwd_event_fact_hide_part 是数据表
CREATE TABLE iceberg_test.default.dwd_event_fact_hide_part (
    user_id BIGINT,action STRING,
    event_time TIMESTAMP,  -- 用于计算分区的原始时间字段
    product_id INT)
USING iceberg
-- 基于 event_time 动态计算日的隐藏分区
PARTITIONED BY (day(event_time));

5. 数据流式写入示例

以下示例展示了如何使用 Flink SQL 将数据写入 Iceberg 表。

5.1 创建数据源表

首先,创建一个数据源表,用于生成模拟数据。

CREATE TABLE doc_source (word varchar)
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '30'
  );
  • connector:使用 datagen 连接器生成模拟数据。
  • rows-per-second:每秒生成的行数。
  • fields.word.length:生成字段 word 的长度。

5.2 写入数据到 Iceberg 表

将数据源表中的数据写入 Iceberg 表。

INSERT INTO `lf_catalog`.`default`.`doc_result`
select
  t.word,
  count(1)
from
  doc_source t
GROUP BY
  t.word;
  • iceberg_catalog:Catalog 名称。
  • default:Database 名称。
  • doc_result:目标表名称。

5.3 开启 Checkpoint

Iceberg 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。

注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。

Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
Image

6. 数据批式写入示例

6.1 作业类型指定批模式

参考第 5 节的内容,此时采用批模式写入,需要在创建作业时指定批模式。
Image

6.2 写入批式数据到 Iceberg 表

以写入一条数据举例如下:

INSERT INTO `Iceberg_catalog`.`default`.`doc_result`
SELECT 'Tom', 25;
  • iceberg_catalog:Catalog 名称。
  • default:Database 名称。
  • doc_result:目标表名称。

此外,也可以批读数据,再批写回去。

7. 上线任务

7.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
Image
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

7.2 确认任务执行成功

可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。

Image

8. 数据读取示例

我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Iceberg 表中流/批式读取数据。进一步可以确认数据准确性。

8.1 创建打印表

创建一个打印表,用于输出读取的数据。

CREATE TABLE `print_table` (
    word varchar,
    cnt bigint
) WITH (
    'connector' = 'print'
);
  • connector:使用 print 连接器将数据打印到控制台。

启动一个 Flink SQL 批任务,从 Iceberg 表中读取数据并写入打印表。

INSERT INTO `print_table`
SELECT * FROM `iceberg_catalog`.`default`.`dwd_event_fact`;
  • iceberg_catalog:Catalog 名称。
  • default:Database 名称。
  • dwd_event_fact:源表名称。

8.3 使用 LAS Catalog 查看元数据

参考查看数据表,进行数据表的元数据查询。

Image

8.4 使用 EMR Serverless Spark 查询 Iceberg 数据

参考 4.1 节,进入 hive 交互命令查询数据

set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.iceberg.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;

use iceberg;

select * from iceberg_test.default.dwd_event_fact limit 10;

验证数据正常写入

9. 常见问题

9.1 验证 SQL 时报错

如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException此类错误,说明当前连接 LAS 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 LAS 元数据。

org.apache.flink.table.api.ValidationException: Unable to create catalog 'Iceberg_test1'.

Catalog options are:
'hive-conf-dir'='/opt/tiger/workdir'
'metastore'='hive'
'type'='iceberg'
'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869'
'warehouse'='tos://flink-cwz-iceberg/iceberg_test1'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511)
        ...
Caused by: java.lang.RuntimeException: Failed to determine if database default exists
        at org.apache.iceberg.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223)
        ... 9 more
Caused by: org.apache.thrift.transport.TTransportException
        at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
        ... 15 more

解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
Image

9.2 运行时访问 LAS 接口失败

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口无法访问

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException

Image
可能原因

  1. 没有上传 hive-site.xml 文件,或者文件名不正确
    1. 解决方法:检查 hive-site.xml 是否成功上传到依赖文件中,并且文件名必须完全符合要求。
  2. 访问 LAS 的 AK/SK 不正确,无法正确认证用户信息。
    1. 解决方法:检查 hive-site.xml 中 AccessKey 和 AccessKeySecret 是否正确。
  3. 访问 LAS 的用户没有 IAM 的 LASFullAccess 权限,请联系管理者开通该权限
    1. 解决方法:联系主账号管理者,检查是否为用户开通 LASFullAccess 权限。

9.3 访问 LAS 报接口无权限

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口没有成功授权

Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Access denied: [DeniedPrivilege(resource:Resource{resourceScope='SCHEMA', catalogName='iceberg_test1', schemaName='test_db'}, action:DESCRIBE)] for user: 31035840

Image
解决方法:这个问题是因为在 LAS Catalog 中没有给指定账号赋予相关权限。请结合报错日志信息提示的 action,参考数据目录管理,为账号开通权限即可。

9.4 任务无法启动,报 LAS 数据库不存在

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
Image
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 LAS 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:

CREATE TABLE IF NOT EXISTS test_db;

重新提交任务之后,就可以恢复正常。

9.5 hive-site.xml 格式不正确

在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException

Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs.
 at [row,col {unknown-source}]: [2,5]

Image
解决办法:这个原因是因为对接 LAS 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:

  1. 文件开头的内容必须是 <?xml ...,在尖括号前方不能包含任何不可见字符、空格、空行等。
  2. xml 文档内必须包含合法的标签,比如在内容中不能出现 <>&等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。