You need to enable JavaScript to run this app.
导航
Iceberg 与 Flink 集成
最近更新时间:2025.04.01 20:13:42首次发布时间:2025.04.01 14:30:59
我的收藏
有用
有用
无用
无用

Apache Flink 是一个可分布式的开源计算框架,能够支持数据流处理和批量数据处理两种应用类型。本文介绍下在 Flink 中操作 Iceberg 表。

操作步骤

本文介绍如何采用 Flink SQL 方式操作 Iceberg 表。 如果您希望采用 Flink DataStream API 来访问Apache Iceberg 表,则请参考 Iceberg官网 进行操作。

准备工作

  1. 配置 Flink 的 checkpoint。

在 /usr/lib/emr/current/flink/conf/flink-conf.yaml 文件的 checkpoint参数下,添加如下配置:

execution.checkpointing.interval: 10s   
# checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10  
# checkpoint 失败容忍次数
  1. yaml文件配置完成后,使用以下命令,启动Flink SQL Client:
export HADOOP_CLASSPATH=`hadoop classpath`

cd /usr/lib/emr/current/flink

# 拷贝Iceberg需要的依赖cp /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar  lib
cp /usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-*.jar lib

# 采用 flink on yarn模式,启动YARN Session
./bin/yarn-session.sh --detached

# Start the flink SQL client
./bin/sql-client.sh embedded shell

说明

不同 LAS 版本,Flink 和 Iceberg 的版本号可能不同。建议采用如下命令来定位您的 iceberg-flink-runtime-xx.xx.xx.jar 和 flink-sql-connector-hive-xx.xx.xx.jar 路径。
EMR3.x 和 EMR1.x 版本采用下面命令:
$ ls /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-.jar
/usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-3
.jar

  1. 创建 Catalog
CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  '<config_key>'='<config_value>'
 );
  • 目前 Iceberg 主要支持 HiveCatalog 和 HadoopCatalog。
    • HiveCatalog 将当前表的元数据文件路径存储在 Hive Metastore,所以每次读写 Iceberg 表都需要先从 Hive Metastore 中取出对应的表元数据文件路径。
    • HadoopCatalog 将当前表元数据文件路径记录在一个文件目录下,因此不需要连接 Hive Metastore。

说明

不同 EMR 版本中节点的域名命名方式可能不同,所以下方示例中“las-master-1”可参考 EMR 的域名规则做相应调整。

示例: 创建 HiveCatalog

CREATE CATALOG iceberg WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://las-master-1:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive'
);

示例:创建 HadoopCatalog

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive',
  'property-version'='1'
);

通过Flink SQL Client检查结果:

Flink SQL> SHOW CATALOGS;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|  hadoop_catalog |
|         iceberg |
+-----------------+
3 rows in set

SQL命令

更多 SQL 命令,详情参考 Flink 官网文档。

  1. 创建数据库
USE CATALOG iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;
  1. 创建表
CREATE TABLE iceberg.iceberg_db.iceberg_001 (
    id BIGINT COMMENT 'unique id',
    data STRING
) WITH ('write.format.default'='ORC'
 );
  1. 查询

可以执行下面的命令把执行类型设置为流式处理模式或者批处理模式。

--提交 flink批处理作业来获取iceberg表中的所有行
SET execution.runtime-mode = batch;
SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
--从flink流作业中增量获取数据
SET execution.runtime-mode = streaming;
SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
  1. 写入

说明

INSERT OVERWRITE只适合于batch job。通过下面命令设置batch job执行模式:
SET execution.runtime-mode = batch;

INSERT INTO iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');

也可以使用 INSERT OVERWRITE 来使用查询结果替换表中的数据:

INSERT OVERWRITE iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
  1. 删除表
DROP TABLE iceberg.iceberg_db.iceberg_001;

说明

删除表,也会将表的数据进行删除。但会留存表的空目录信息,如:
Image