You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Iceberg 数据库湖开发
Iceberg 使用 S3 协议访问 Catalog 数据
复制全文
Iceberg 使用 S3 协议访问 Catalog 数据

背景介绍

Apache Flink 是一个高性能的流式计算框架,适用于实时数据处理场景。Apache Iceberg 是一种开源表格式,提供 ACID 事务、模式演化和分区管理等特性,适用于构建可靠的数据湖。通过将 Flink 与 Iceberg 集成,用户可以实现流式数据实时写入 Iceberg 表,支持数据湖的实时更新和查询。
本手册重点介绍使用 Flink SQL 和 Iceberg Hadoop Catalog + S3 协议的方式,在火山引擎流式计算 Flink 版中写入 Iceberg 表。Iceberg Hadoop Catalog 通过文件系统集中管理表元数据,是最简单的 Catalog 管理方式。

前提条件

  • 火山引擎环境
    • 已开通火山引擎账户,并创建流式计算 Flink 版资源池。建议使用 Flink 1.16 或以上版本,以兼容 Iceberg 集成。
    • 确保 Flink 集群具有公网或 VPC 内网访问权限,以便连接 Iceberg Rest Catalog 服务。建议可以通过 Session 集群网络检测确认网络联通。

Image

  • 依赖和权限
    • 在 Flink 集群中引入 Iceberg 相关 JAR 包(例如 iceberg-flink-runtime-1.17-1.6.1.jar),可通过火山引擎控制台添加依赖或手动上传。
    • 确保 Flink 集群具有 Iceberg 表存储路径的读写权限(例如,TOS 存储需配置 AK/SK 或角色授权)。
  • 基本知识:熟悉 Flink SQL 语法和 Iceberg 表概念。如需调试,建议提前准备测试数据流。

使用方法

Flink SQL 任务部署,我们主要包含以下几个步骤:

  1. 创建 SQL 任务
  2. 配置 s3 访问参数
  3. 发布并启动任务
  4. 观察任务结果

创建 SQL 任务

步骤一:在流式计算 Flink 版控制台创建 Flink SQL 作业,建议选择 Flink 1.16+ 版本测试使用。
Image

步骤二:设置 SQL 作业代码:

CREATE TABLE doc_source (word varchar)
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.word.length' = '30'
  );

-- 创建 Iceberg on Rest Catalog
CREATE CATALOG lf_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='s3a://<your_bucket>/<your_warehouse>'
);

CREATE DATABASE IF NOT EXISTS lf_catalog.`default`;

-- 创建 Iceberg 表(支持分区、排序等特性)
CREATE TABLE IF NOT EXISTS lf_catalog.`default`.test_table (
  word STRING,
  number BIGINT
)
WITH (
  'format-version' = '2',  -- Iceberg 表格式版本
  'write.format.default' = 'parquet'
);

INSERT INTO `lf_catalog`.`default`.`test_table`
select
  t.word,
  count(1)
from
  doc_source t
GROUP BY
  t.word;

配置任务参数

步骤一:上传所需的 iceberg-runtime 版本,本文中采用 flink 1.17 ,iceberg 1.7.2 版本实验。用户也可以自行下载社区1.17 适配的版本。

iceberg-flink-runtime-1.17-1.7.2-byted-SNAPSHOT.jar
未知大小

Image

步骤二:本文中采用 s3 的方式访问 tos 数据,所以需要填写一下参数内容:

flink.plugins.filesystem.s3a.proton.enabled: true
flink.hadoop.fs.s3a.impl: io.proton.fs.ProtonFileSystem
flink.hadoop.fs.s3a.multipart.size: 104857600
flink.hadoop.proton.objectstorage.s3a.impl: io.proton.common.object.tos.TOS
flink.hadoop.fs.tos.credentials.provider: io.proton.common.object.auth.SimpleCredentialsProvider
-- 填写 AK/SK 需要保证有访问对应 TOS 的权限,可以使用变量管理进行加密处理
flink.hadoop.fs.tos.access-key-id: ${secret_values.your-ak}
flink.hadoop.fs.tos.secret-access-key: ${secret_values.your-sk}

Image

任务启动

检查无误后,保存并发布任务,当前暂时无法执行深度校验,需要跳过深度检查阶段
Image
进入任务运维界面,选择全新启动/重启任务,后续更新任务的话可以从最新状态启动:
Image

等待一个 Checkpoint 的周期,可以从 TOS 的桶文件,确认数据已经写入 Iceberg:
Image

读取 Iceberg 数据

和之前方式相同,创建流式任务,从 Iceberg Catalog 中读取数据可以参考如下 SQL,并且同样的步骤设置自定义依赖文件和自定义参数。但要注意的是需要在 catalog 中指定 io-impl选择使用 HadoopFileIO。发布运行后可以看到成功消费数据结果:

-- 创建 Iceberg on LAS Catalog
CREATE CATALOG lf_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  -- 需要指定读取的时候使用 HadoopFileIO
  'io-impl'='org.apache.iceberg.hadoop.HadoopFileIO',
  'warehouse'='s3a://<your_bucket>/<your_warehouse>'
);

CREATE DATABASE IF NOT EXISTS lf_catalog.`default`;

-- 创建 Iceberg 表(支持分区、排序等特性)
CREATE TABLE `print_result` (
  word STRING,
  number BIGINT
)
WITH (
  'connector' = 'print'
);

-- 流读 iceberg 数据
INSERT INTO `print_result`
select
  *
from
  `lf_catalog`.`default`.`test_table` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s') */;

常见问题

以下列出使用过程中可能遇到的问题及解决方案:

  • 问题 1:Flink 作业启动失败,报错 "Could not find Iceberg catalog"
    • 原因:通常由于 Iceberg Catalog 配置错误或依赖未加载。
    • 解决:检查 CREATE CATALOG 语句中的 uriwarehouse 路径是否正确;确认 Flink 集群已加载 Iceberg JAR 包。可通过火山引擎日志服务查看详细错误信息。
  • 问题 2:数据写入 Iceberg 表失败,提示权限错误
    • 原因:Flink 集群无权限访问 Iceberg 存储路径(如 TOS)。
    • 解决:验证存储路径的 AK/SK 或 IAM 角色配置;在火山引擎 TOS 控制台检查桶策略是否允许 Flink 集群访问。
  • 问题 3:Iceberg 表查询不到最新数据
    • 原因:可能由于 Flink 作业未触发 Checkpoint 提交,或 Iceberg 元数据未刷新。
    • 解决:确保 Flink Checkpoint 已启用(检查点间隔建议设置为 5 分钟级)并且已经成功执行 Checkpoint 至少一次。
  • 问题 4:流式写入性能慢或延迟高
    • 原因:可能是 Flink 并行度不足或 Iceberg 小文件过多。
    • 解决:增加 Flink 作业的并行度;调整 Iceberg 属性(如 write.target-file-size-bytes)以优化文件大小;定期执行 Iceberg 的 expire_snapshots 清理旧快照。
  • 问题 5:访问方式不支持跨 Region 访问。
    • 原因:Flink 默认访问本区域内的 endpoint 。
    • 解决:flink.hadoop.fs.tos.endpoint设置为跨区域访问的 endpoint 。注意如果是通过公网访问,需要参考Flink 访问公网 提前打通网络。

如果问题仍未解决,建议参考火山引擎官方文档或联系技术支持,提供 Flink 作业 ID 和错误日志以获取进一步帮助。

最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用