本手册将指导您如何使用 Flink 引擎进行 Iceberg 的开发任务,并且利用 LAS Catalog 统一管理 Iceberg 的元数据。
前置条件 | 详细说明 |
|---|---|
开通流式计算 Flink 版产品 | [必选] 需开通该产品,并能在作业开发中创建 Flink SQL 任务 |
购买资源池 | [必选] 需在资源管理 - 资源池功能模块购买按量或包年包月的资源池,以正常提交 Flink 任务 |
开通 LAS Catalog 服务 | [必选] 需开通 LAS Catalog 统一元数据管理服务,可参考 LAS Catalog 开通文档 |
关于 EMR Serverless Spark 的开通 | [必选] 若需创建隐藏分区表,需开通 EMR Serverless Spark; |
需要生成访问密钥 | [必选] 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 hive-site.xml 中)。 |
在 LAS Catalog 产品中创建 Iceberg 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。
注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。
并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档:开发 Flink SQL 任务
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <!-- `metastore.catalog.default` 是写入 Hive Catalog 的名称,如果不填默认是 hive --> <property> <name>metastore.catalog.default</name> <value>__LAS_DEFAULT_CATALOG__</value> </property> <property> <name>hive.server2.max.start.attempts</name> <value>5</value> </property> <!-- `hive.client.las.region.name` 是 LAS 和 Flink 服务所在 Region `hive.metastore.uris` 是 LAS 的 thrift 接口地址,需要和所在 Region 保持一致 --> <property> <name>hive.client.las.region.name</name> <value>__LAS_REGION__</value> <!-- 华北 <value>cn-beijing</value> --> <!-- 华东 <value>cn-shanghai</value> --> <!-- 华南 <value>cn-guangzhou</value> --> <!-- 柔佛 <value>ap-southeast-1</value> --> </property> <property> <name>hive.metastore.uris</name> <value>__LAS_METASTORE_URL__</value> <!-- 华北 <value>thrift://lakeformation.las.cn-beijing.ivolces.com:48869</value> --> <!-- 华东 <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value> --> <!-- 华南 <value>thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869</value> --> <!-- 柔佛 <value>thrift://lakeformation.las.ap-southeast-1.ivolces.com:48869</value> --> </property> <property> <name>hive.hms.client.is.public.cloud</name> <value>true</value> </property> <!-- `hive.client.las.ak/sk` 是 LAS 服务访问的 AccessKeyId 和 AccessKeySecret 填写前请确认子账号拥有 LAS 的 IAM 访问权限,和 Catalog 的数据权限 --> <property> <name>hive.client.las.ak</name> <value>__LAS_ACCESS_KEY__</value> </property> <property> <name>hive.client.las.sk</name> <value>__LAS_ACCESS_KEY_SECRET__</value> </property> </configuration>
因为 LAS Catalog 提供兼容 Hive 的元数据 thrift 接口。所以在 Flink 中使用 LAS Catalog 的方式和基于 Hive 的 Catalog 的方式非常相似。我们可以通过如下 SQL 创建 LAS Catalog:
CREATE CATALOG ${catalog_name} WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869', 'clients'='5', 'property-version'='1', 'hive-conf-dir' = '/opt/tiger/workdir/', 'warehouse' = 'tos://${bucket_name}/${catalog_name}' );
其中两个变量需要按照如下:
${catalog_name}:Catalog 的名称,和 LAS 中的 Catalog 名称需要保持一致。${bucket_name}:存储 Iceberg 数据的 TOS(对象存储)桶名称。WITH 参数的意义如下:
type:选择 iceberg 类型 Catalog。metastore:选择 hive 的方式进行元数据管理。uri:LAS Catalog 的元数据 thrift 接口,可以参考示例中的地址,将 __LAS_REGION__ 改成和 Flink 服务相同的区域即可。hive-conf-dir:这里指定 hive-site.xml 的文件路径。注意:按照本篇文档内容测试方案,此处需要固定填写 /opt/tiger/workdir。warehouse:和 LAS Catalog 中的数据目录存储位置保持一致。目前有两种方式创建 Database 和 Table:
-- 创建 Iceberg on LAS Catalog CREATE CATALOG lf_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869', 'clients'='5', 'property-version'='1', 'hive-conf-dir' = '/opt/tiger/workdir/', -- 加载 hive-site.xml 配置 'warehouse'='tos://tos-bucket-test/' ); CREATE DATABASE IF NOT EXISTS lf_catalog.`default`; -- 创建 Iceberg 表(支持分区、排序等特性) CREATE TABLE IF NOT EXISTS lf_catalog.`default`.test_table ( word STRING, number BIGINT ) WITH ( 'format-version' = '2', -- Iceberg 表格式版本 'write.format.default' = 'parquet', -- 默认文件格式 'location' = 'tos://flink-cwz-paimon/cwz_iceberg/default.db/test_table' -- 表存储路径 );
注意:由于建表并没有实际的 DML 操作,因此作业执行会报错,但是不影响建表的功能。
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 使用 iceberg catalog use iceberg; -- 使用三段式定义 las 表,如下所示,iceberg_test 是 las 数据目录,default 是数据库,dwd_event_fact 是数据表 CREATE TABLE iceberg_test.default.dwd_event_fact(id STRING, number INT) USING iceberg;
创建隐藏分区表:
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; -- 使用 iceberg catalog use iceberg; -- 使用三段式定义 las 表,如下所示,iceberg_test 是 las 数据目录,default 是数据库,dwd_event_fact_hide_part 是数据表 CREATE TABLE iceberg_test.default.dwd_event_fact_hide_part ( user_id BIGINT,action STRING, event_time TIMESTAMP, -- 用于计算分区的原始时间字段 product_id INT) USING iceberg -- 基于 event_time 动态计算日的隐藏分区 PARTITIONED BY (day(event_time));
以下示例展示了如何使用 Flink SQL 将数据写入 Iceberg 表。
首先,创建一个数据源表,用于生成模拟数据。
CREATE TABLE doc_source (word varchar) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.word.length' = '30' );
connector:使用 datagen 连接器生成模拟数据。rows-per-second:每秒生成的行数。fields.word.length:生成字段 word 的长度。将数据源表中的数据写入 Iceberg 表。
INSERT INTO `lf_catalog`.`default`.`doc_result` select t.word, count(1) from doc_source t GROUP BY t.word;
iceberg_catalog:Catalog 名称。default:Database 名称。doc_result:目标表名称。Iceberg 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。
注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。
Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
参考第 5 节的内容,此时采用批模式写入,需要在创建作业时指定批模式。
以写入一条数据举例如下:
INSERT INTO `Iceberg_catalog`.`default`.`doc_result` SELECT 'Tom', 25;
iceberg_catalog:Catalog 名称。default:Database 名称。doc_result:目标表名称。此外,也可以批读数据,再批写回去。
此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。
我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Iceberg 表中流/批式读取数据。进一步可以确认数据准确性。
创建一个打印表,用于输出读取的数据。
CREATE TABLE `print_table` ( word varchar, cnt bigint ) WITH ( 'connector' = 'print' );
connector:使用 print 连接器将数据打印到控制台。启动一个 Flink SQL 批任务,从 Iceberg 表中读取数据并写入打印表。
INSERT INTO `print_table` SELECT * FROM `iceberg_catalog`.`default`.`dwd_event_fact`;
iceberg_catalog:Catalog 名称。default:Database 名称。dwd_event_fact:源表名称。参考查看数据表,进行数据表的元数据查询。
参考 4.1 节,进入 hive 交互命令查询数据
set spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog; set spark.sql.catalog.iceberg.type=hive; set spark.sql.storeAssignmentPolicy=ansi; use iceberg; select * from iceberg_test.default.dwd_event_fact limit 10;
验证数据正常写入
如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException此类错误,说明当前连接 LAS 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 LAS 元数据。
org.apache.flink.table.api.ValidationException: Unable to create catalog 'Iceberg_test1'. Catalog options are: 'hive-conf-dir'='/opt/tiger/workdir' 'metastore'='hive' 'type'='iceberg' 'uri'='thrift://lakeformation.las.cn-beijing.ivolces.com:48869' 'warehouse'='tos://flink-cwz-iceberg/iceberg_test1' at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511) ... Caused by: java.lang.RuntimeException: Failed to determine if database default exists at org.apache.iceberg.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223) ... 9 more Caused by: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ... 15 more
解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口无法访问
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException
可能原因
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 LAS 的接口没有成功授权
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Access denied: [DeniedPrivilege(resource:Resource{resourceScope='SCHEMA', catalogName='iceberg_test1', schemaName='test_db'}, action:DESCRIBE)] for user: 31035840
解决方法:这个问题是因为在 LAS Catalog 中没有给指定账号赋予相关权限。请结合报错日志信息提示的 action,参考数据目录管理,为账号开通权限即可。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 LAS 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:
CREATE TABLE IF NOT EXISTS test_db;
重新提交任务之后,就可以恢复正常。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException:
Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs. at [row,col {unknown-source}]: [2,5]
解决办法:这个原因是因为对接 LAS 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:
<?xml ...,在尖括号前方不能包含任何不可见字符、空格、空行等。< 、>、&等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。