You need to enable JavaScript to run this app.
导航

Iceberg 基础使用(适用于EMR2.x版本)

最近更新时间2022.10.19 16:10:14

首次发布时间2022.09.22 16:55:46


本文介绍在 E-MapReduce(EMR) 集群2.x版本中,采用 Spark DataFrame API 方式对 Iceberg 表进行创建等操作。

1 前提条件

需要在 EMR 集群上安装 Iceberg 组件。有两种方式可以安装Iceberg组件:

  • 在创建 E-MapReduce 集群时,选择 Icerberg 作为可选组件,详见:创建集群

  • 对已安装 E-MapReduce 集群,参考 服务管理章节 添加Iceberg服务。

2 操作步骤

  1. 新建 Maven 项目并引入 pom 依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.8</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
    <version>0.14.0</version>
    <scope>compile</scope>
</dependency>

说明

Spark 组件和 Iceberg 组件的版本信息,请参考 EMR 服务中该组件对应的版本信息。

  1. 创建表:
import org.apache.iceberg.hive.HiveCatalog;
val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new util.HashMap[String, String]
properties.put("warehouse", "/user/hive/warehouse/iceberg/hive")
properties.put("uri", "thrift://emr-master-1:9083")
catalog.initialize("hive", properties)
catalog.createTable(name, schema, spec);

说明

请根据 EMR 集群信息,填写 properties 的配置,其中 "warehouse" 是指数据存放的地址,"uri" 是指 Hive Metastore 的地址。

  1. 追加数据:

    val dataFrame = spark.createDataFrame(Seq((2, "LiSi", 20))).toDF("id", "name", "age")
    dataFrame.write.format("iceberg").mode("append").save("db.table")
    
  2. 覆盖数据:

    val dataFrame = spark.createDataFrame(Seq((3, "WangWu", 20))).toDF("id", "name", "age")
    dataFrame.write.format("iceberg").mode("overwrite").save("db.table")
    
  3. 查询数据:

    val dataFrame = spark.table("iceberg.iceberg_db.iceberg_001")
    // named metastore table
    spark.read.format("iceberg").load("db.table")// Hadoop path table
    

    To run SQL SELECT statements on Iceberg tables in 2.4, register the DataFrame as a temporary table:

    val df = spark.read.format("iceberg").load("db.table")df.createOrReplaceTempView("table")
    spark.sql("""select count(1) from table""").show()
    

3 完整示例

本示例是使用 Spark DataFrame API 批式读写 Iceberg 表。

  1. 编写Spark代码。

    以Scala版代码为例,代码示例如下。

package com.bytedance

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.types.Types

object IcebergSpark2ScalaExample {
  def main(args: Array[String]): Unit = {
    // 配置使用数据湖元数据。
    val sparkConf = new SparkConf()
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergSparkScalaExample")
      .getOrCreate()

    val name = TableIdentifier.of("default","spark2_demo");
    val tableName = name.toString
    val catalog = new HiveCatalog()
    catalog.setConf(spark.sparkContext.hadoopConfiguration)
    
    val properties = new util.HashMap[String, String]
    properties.put("warehouse", "/user/hive/warehouse/iceberg/hive")
    properties.put("uri", "thrift://emr-master-1:9083")
    catalog.initialize("hive", properties)
   
    val schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()),
      Types.NestedField.optional(2, "name", Types.StringType.get()),
      Types.NestedField.required(3, "age", Types.IntegerType.get()))
    val spec = PartitionSpec.builderFor(schema).bucket("id", 2).build();
    try {
      // 创建 Iceberg 表
      catalog.createTable(name, schema, spec);
    } catch {
      case e: org.apache.iceberg.exceptions.AlreadyExistsException =>
    }

    // 从DataFrame中创建或替换Iceberg表
    val df1 = spark.createDataFrame(Seq((1, "ZhangSan", 20), (2, "LiSi", 25), (3, "WangWu", 30)))
      .toDF("id", "name", "age")
    df1.write.format("iceberg").mode("overwrite").save(tableName)
    // 读Iceberg表
    spark.read.format("iceberg").load(tableName).show()

    // 将DataFrame写入Iceberg表
    val df2 = spark.createDataFrame(Seq((4, "LiLei", 28), (5, "XiaoMing", 22)))
      .toDF("id", "name", "age")
    df2.write.format("iceberg").mode("append").save(tableName)
    // 读Iceberg表
    spark.read.format("iceberg").load(tableName).show()
  }
}
  1. 打包程序并部署到 EMR 集群。

    1. 检查编译 Scala 代码的 Maven 插件,可以在 pom.xml 中配置如下插件。
    <build>
        <plugins>
            <!-- the Maven Scala plugin will compile Scala source files -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    1. 执行编译命令
    mvn clean package
    
    1. 将生成的jar包上传到EMR集群上

  1. 通过spark-submit命令运行Spark作业

    spark-submit --class com.bytedance.IcebergSpark2ScalaExample iceberg-spark2-example-1.0.jar
    

说明

class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark2-example-1.0.jar就是根据代码工程打出的JAR包。

运行结果如下

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  4|   LiLei| 28|
|  1|ZhangSan| 20|
|  5|XiaoMing| 22|
|  2|    LiSi| 25|
|  3|  WangWu| 30|
+---+--------+---+