Apache Flink 是一个高性能的流式计算框架,适用于实时数据处理场景。Apache Iceberg 是一种开源表格式,提供 ACID 事务、模式演化和分区管理等特性,适用于构建可靠的数据湖。通过将 Flink 与 Iceberg 集成,用户可以实现流式数据实时写入 Iceberg 表,支持数据湖的实时更新和查询。
本手册重点介绍使用 Flink SQL 和 Iceberg Hadoop Catalog + S3 协议的方式,在火山引擎流式计算 Flink 版中写入 Iceberg 表。Iceberg Hadoop Catalog 通过文件系统集中管理表元数据,是最简单的 Catalog 管理方式。
iceberg-flink-runtime-1.17-1.6.1.jar),可通过火山引擎控制台添加依赖或手动上传。Flink SQL 任务部署,我们主要包含以下几个步骤:
步骤一:在流式计算 Flink 版控制台创建 Flink SQL 作业,建议选择 Flink 1.16+ 版本测试使用。
步骤二:设置 SQL 作业代码:
CREATE TABLE doc_source (word varchar) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.word.length' = '30' ); -- 创建 Iceberg on Rest Catalog CREATE CATALOG lf_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='s3a://<your_bucket>/<your_warehouse>' ); CREATE DATABASE IF NOT EXISTS lf_catalog.`default`; -- 创建 Iceberg 表(支持分区、排序等特性) CREATE TABLE IF NOT EXISTS lf_catalog.`default`.test_table ( word STRING, number BIGINT ) WITH ( 'format-version' = '2', -- Iceberg 表格式版本 'write.format.default' = 'parquet' ); INSERT INTO `lf_catalog`.`default`.`test_table` select t.word, count(1) from doc_source t GROUP BY t.word;
步骤一:上传所需的 iceberg-runtime 版本,本文中采用 flink 1.17 ,iceberg 1.7.2 版本实验。用户也可以自行下载社区1.17 适配的版本。
步骤二:本文中采用 s3 的方式访问 tos 数据,所以需要填写一下参数内容:
flink.plugins.filesystem.s3a.proton.enabled: true flink.hadoop.fs.s3a.impl: io.proton.fs.ProtonFileSystem flink.hadoop.fs.s3a.multipart.size: 104857600 flink.hadoop.proton.objectstorage.s3a.impl: io.proton.common.object.tos.TOS flink.hadoop.fs.tos.credentials.provider: io.proton.common.object.auth.SimpleCredentialsProvider -- 填写 AK/SK 需要保证有访问对应 TOS 的权限,可以使用变量管理进行加密处理 flink.hadoop.fs.tos.access-key-id: ${secret_values.your-ak} flink.hadoop.fs.tos.secret-access-key: ${secret_values.your-sk}
检查无误后,保存并发布任务,当前暂时无法执行深度校验,需要跳过深度检查阶段
进入任务运维界面,选择全新启动/重启任务,后续更新任务的话可以从最新状态启动:
等待一个 Checkpoint 的周期,可以从 TOS 的桶文件,确认数据已经写入 Iceberg:
和之前方式相同,创建流式任务,从 Iceberg Catalog 中读取数据可以参考如下 SQL,并且同样的步骤设置自定义依赖文件和自定义参数。但要注意的是需要在 catalog 中指定 io-impl选择使用 HadoopFileIO。发布运行后可以看到成功消费数据结果:
-- 创建 Iceberg on LAS Catalog CREATE CATALOG lf_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', -- 需要指定读取的时候使用 HadoopFileIO 'io-impl'='org.apache.iceberg.hadoop.HadoopFileIO', 'warehouse'='s3a://<your_bucket>/<your_warehouse>' ); CREATE DATABASE IF NOT EXISTS lf_catalog.`default`; -- 创建 Iceberg 表(支持分区、排序等特性) CREATE TABLE `print_result` ( word STRING, number BIGINT ) WITH ( 'connector' = 'print' ); -- 流读 iceberg 数据 INSERT INTO `print_result` select * from `lf_catalog`.`default`.`test_table` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s') */;
以下列出使用过程中可能遇到的问题及解决方案:
CREATE CATALOG 语句中的 uri 和 warehouse 路径是否正确;确认 Flink 集群已加载 Iceberg JAR 包。可通过火山引擎日志服务查看详细错误信息。write.target-file-size-bytes)以优化文件大小;定期执行 Iceberg 的 expire_snapshots 清理旧快照。flink.hadoop.fs.tos.endpoint设置为跨区域访问的 endpoint 。注意如果是通过公网访问,需要参考Flink 访问公网 提前打通网络。如果问题仍未解决,建议参考火山引擎官方文档或联系技术支持,提供 Flink 作业 ID 和错误日志以获取进一步帮助。