Apache Flink 是一个可分布式的开源计算框架,能够支持数据流处理和批量数据处理两种应用类型。本文介绍下在 Flink 中操作 Iceberg 表。
本文介绍如何采用 Flink SQL 方式操作 Iceberg 表。 如果您希望采用 Flink DataStream API 来访问Apache Iceberg 表,则请参考 Iceberg官网 进行操作。
在 /usr/lib/emr/current/flink/conf/flink-conf.yaml 文件的 checkpoint参数下,添加如下配置:
execution.checkpointing.interval: 10s # checkpoint间隔时间 execution.checkpointing.tolerable-failed-checkpoints: 10 # checkpoint 失败容忍次数
export HADOOP_CLASSPATH=`hadoop classpath` cd /usr/lib/emr/current/flink # 拷贝Iceberg需要的依赖cp /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-*.jar lib cp /usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-*.jar lib # 采用 flink on yarn模式,启动YARN Session ./bin/yarn-session.sh --detached # Start the flink SQL client ./bin/sql-client.sh embedded shell
说明
不同 LAS 版本,Flink 和 Iceberg 的版本号可能不同。建议采用如下命令来定位您的 iceberg-flink-runtime-xx.xx.xx.jar 和 flink-sql-connector-hive-xx.xx.xx.jar 路径。
EMR3.x 和 EMR1.x 版本采用下面命令:
$ ls /usr/lib/emr/current/iceberg/lib/iceberg-flink-runtime-.jar
/usr/lib/emr/current/flink/connectors/flink-sql-connector-hive-3.jar
CREATE CATALOG <catalog_name> WITH ( 'type'='iceberg', '<config_key>'='<config_value>' );
说明
不同 EMR 版本中节点的域名命名方式可能不同,所以下方示例中“las-master-1”可参考 EMR 的域名规则做相应调整。
示例: 创建 HiveCatalog
CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://las-master-1:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive' );
示例:创建 HadoopCatalog
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://emr-cluster/warehouse/tablespace/managed/hive', 'property-version'='1' );
通过Flink SQL Client检查结果:
Flink SQL> SHOW CATALOGS; +-----------------+ | catalog name | +-----------------+ | default_catalog | | hadoop_catalog | | iceberg | +-----------------+ 3 rows in set
更多 SQL 命令,详情参考 Flink 官网文档。
USE CATALOG iceberg; CREATE DATABASE iceberg_db; USE iceberg_db;
CREATE TABLE iceberg.iceberg_db.iceberg_001 ( id BIGINT COMMENT 'unique id', data STRING ) WITH ('write.format.default'='ORC' );
可以执行下面的命令把执行类型设置为流式处理模式或者批处理模式。
--提交 flink批处理作业来获取iceberg表中的所有行 SET execution.runtime-mode = batch; SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
--从flink流作业中增量获取数据 SET execution.runtime-mode = streaming; SELECT * FROM iceberg.iceberg_db.iceberg_001 limit 10;
说明
INSERT OVERWRITE只适合于batch job。通过下面命令设置batch job执行模式:SET execution.runtime-mode = batch;
INSERT INTO iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
也可以使用 INSERT OVERWRITE 来使用查询结果替换表中的数据:
INSERT OVERWRITE iceberg.iceberg_db.iceberg_001 VALUES (1, 'a');
DROP TABLE iceberg.iceberg_db.iceberg_001;
说明
删除表,也会将表的数据进行删除。但会留存表的空目录信息,如: