本文介绍在 E-MapReduce(EMR) 集群,通过 Spark SQL 对 Iceberg 表进行创建等操作。
spark-sql \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.type=hive \ --conf spark.sql.catalog.icenerg.uri=thrift://master-1-1:9083
说明
本文示例中的 iceberg 为您创建的 Catalog 名称。为保证操作的数据库和表都是在指定的 Catalog 下,您需要在命令中的数据库和表名前带上 iceberg 。例如:
以上命令示例中配置项的描述如下,也可参考 Iceberg高阶使用的参数配置章节获得更多配置项解释:
hive.metastore.uris参数对应的值。若在EMR集群上运行,参数spark.sql.catalog.hive.uri可以省略。执行以下命令,就可以进入 spark-sql 的命令行
spark-sql>
说明
如果想在 Trino/Presto/Hive 中可以操作 Spark 创建 Iceberg 表,需要进入集群详情 > 服务列表 > HDFS > 服务参数界面,在 HDFS 的服务参数 hdfs-site 中添加iceberg.engine.hive.enabled=true配置,并重启 HDFS 和 Hive 组件,然后在 Spark 中创建 Iceberg 表,Hive 中采用操作该表。
创建库
CREATE DATABASE IF NOT EXISTS iceberg.iceberg_db;
创建表
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.spark_pokes( rank BIGINT, suite STRING ) USING iceberg;
这里,建表语句支持 COMMENT、PARTITIONED BY、TBLPROPERTIES 等语法,详细配置参考官网文档。
例如,使用 PARTITIONED BY 设置分区属性示例如下:
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.spark_pokes( rank BIGINT, suite STRING ) USING iceberg PARTITIONED BY (suite);
写入数据
INSERT INTO iceberg.iceberg_db.spark_pokes VALUES (1, 'name1'), (2, 'name2');
查询数据
SELECT * FROM iceberg.iceberg_db.spark_pokes;
更新数据
UPDATE iceberg.iceberg_db.spark_pokes SET suite = 'volcano' WHERE rank = 2;
删除数据
DELETE FROM iceberg.iceberg_db.spark_pokes WHERE rank = 2;
删除表
DROP TABLE iceberg.iceberg_db.spark_pokes;
说明
DROP TABEL方式删除表,只是从 Hive Metastore 中删除表的元数据,表的数据还是会保留。