You need to enable JavaScript to run this app.
导航

Spark批式读写Iceberg

最近更新时间2023.11.27 14:22:06

首次发布时间2022.09.22 16:55:46

Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。本文以 Spark 3.x 操作Iceberg表为例,介绍如何通过 Spark API 以批处理的方式读写 Iceberg 表。

1 前提条件

  1. 适合 E-MapReduce(EMR) 1.2.0以后的版本(包括 EMR 1.2.0)

  2. 不适配 EMR2.x 版本。关于 EMR2.x 版本的 Spark 操作 Iceberg 表,请参考 Iceberg基础使用(适用于EMR2.x版本)

  3. 已创建 EMR 集群,且安装有 Iceberg 组件。有两种方式可以安装 Iceberg 组件:

    1. 在创建 EMR 集群时,选择 Icerberg 作为可选组件,详见:创建集群

    2. 对已安装 EMR 集群,参考 服务管理章节 添加 Iceberg 服务

2 操作步骤

  1. 新建 Maven 项目并引入 pom 依赖:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.1</version>
        <scope>provided</scope>
    </dependency>
    

    说明

    Spark 组件和 Iceberg 组件的版本信息,需参考 EMR 服务中该组件对应的版本信息。

  2. 配置 Catalog:

    Spark 3.x写数据到Iceberg表,V1 DataFrame API已不推荐使用,建议采用DataFrameWriterV2 API。以下代码以V2 API写入表名为 sample 的 iceberg 表为例。。
      首先需要配置Catalog,在SparkConf中加入必要配置项即可。

    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.<yourCatalogName>", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.<yourCatalogName>.type", "hadoop")
    sparkConf.set("spark.sql.catalog.<yourCatalogName>.warehouse", "hdfs://emr-master-1:8020/user/hive/warehouse/iceberg/hive")
    

    说明

    • 示例中的<yourCatalogName>为 Catalog 的名称,请根据实际情况修改 Catalog 名称。
    • 示例中 warehouse 填写 HDFS 的路径,需根据实际情况修改。
  3. 创建表:

    val dataFrame = spark.createDataFrame(Seq((1, "ZhangSan", 20))).toDF("id", "name", "age")
    dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").create()
    
  4. 追加数据:

    val dataFrame = spark.createDataFrame(Seq((2, "LiSi", 20))).toDF("id", "name", "age")
    dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").append()
    
  5. 覆盖数据:

    val dataFrame = spark.createDataFrame(Seq((3, "WangWu", 20))).toDF("id", "name", "age")
    dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").overwritePartitions()
    
  6. 查询数据:

    val dataFrame = spark.table("iceberg.iceberg_db.iceberg_001")
    

3 完整示例

本示例是使用Spark DataFrame API批式读写Iceberg表。

  1. 编写Spark代码。

    以Scala版代码为例,代码示例如下。

    package com.bytedance
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object IcebergSparkScalaExample {
      def main(args: Array[String]): Unit = {
        // 配置使用数据湖元数据。
        val sparkConf = new SparkConf()
        sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
        sparkConf.set("spark.sql.catalog.iceberg.type", "hadoop")
        sparkConf.set("spark.sql.catalog.iceberg.warehouse", "/warehouse/tablespace/managed/hive")
    
        val spark = SparkSession
          .builder()
          .config(sparkConf)
          .appName("IcebergSparkScalaExample")
          .getOrCreate()
    
        // 从DataFrame中创建或替换Iceberg表
        val df1 = spark.createDataFrame(
            Seq((1, "ZhangSan", 20),
                (2, "LiSi", 25),
                (3, "WangWu", 30))
          )
          .toDF("id", "name", "age")
        df1.writeTo("iceberg.iceberg_db.sample").createOrReplace()
    
        // 读Iceberg表
        spark.table("iceberg.iceberg_db.sample").show()
    
        // 将DataFrame写入Iceberg表
        val df2 = spark.createDataFrame(Seq((4, "LiLei", 28), (5, "XiaoMing", 22)))
          .toDF("id", "name", "age")
        df2.writeTo("iceberg.iceberg_db.sample").append()
    
        // 读Iceberg表
        spark.table("iceberg.iceberg_db.sample").show()
      }
    }
    
  2. 打包程序并部署到EMR集群。

    1. 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
    <build>
        <plugins>
            <!-- the Maven Scala plugin will compile Scala source files -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    1. 执行编译命令
    mvn clean package
    
    1. 将生成的jar包上传到EMR集群上

  1. 通过spark-submit命令运行Spark作业

    spark-submit --class com.bytedance.IcebergSparkScalaExample  iceberg-spark-example-1.0.jar
    

    说明

    class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark-example-1.0.jar就是根据代码工程打出的JAR包。

运行结果如下

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  4|   LiLei| 28|
|  1|ZhangSan| 20|
|  5|XiaoMing| 22|
|  2|    LiSi| 25|
|  3|  WangWu| 30|
+---+--------+---+