You need to enable JavaScript to run this app.
导航
Spark批式读写Iceberg
最近更新时间:2025.04.01 20:13:42首次发布时间:2025.03.29 22:25:48
我的收藏
有用
有用
无用
无用

Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。本文以 Spark 3.x 操作Iceberg表为例,介绍如何通过 Spark API 以批处理的方式读写 Iceberg 表。

操作步骤

  1. 新建 Maven 项目并引入 pom 依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.5.1</version>
    <scope>provided</scope>
</dependency>

说明

Spark 组件和 Iceberg 组件的版本信息,需参考 EMR 服务中该组件对应的版本信息。

  1. 配置 Catalog:

Spark 3.x 写数据到 Iceberg 表,V1 DataFrame API 已不推荐使用,建议采用 DataFrameWriterV2 API。以下代码以 V2 API 写入表名为 sample 的 iceberg 表为例。
首先需要配置 Catalog,在 SparkConf 中加入必要配置项即可。

sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.<yourCatalogName>", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.<yourCatalogName>.type", "hadoop")
sparkConf.set("spark.sql.catalog.<yourCatalogName>.warehouse", "hdfs://las-master-1:8020/user/hive/warehouse/iceberg/hive")

说明

  • 示例中的<yourCatalogName>为 Catalog 的名称,请根据实际情况修改 Catalog 名称。
  • 示例中 warehouse 填写 HDFS 的路径,需根据实际情况修改。
  1. 创建表:
val dataFrame = spark.createDataFrame(Seq((1, "ZhangSan", 20))).toDF("id", "name", "age")
dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").create()
  1. 追加数据:
val dataFrame = spark.createDataFrame(Seq((2, "LiSi", 20))).toDF("id", "name", "age")
dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").append()
  1. 覆盖数据:
val dataFrame = spark.createDataFrame(Seq((3, "WangWu", 20))).toDF("id", "name", "age")
dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").overwritePartitions()
  1. 查询数据:
val dataFrame = spark.table("iceberg.iceberg_db.iceberg_001")

完整示例

本示例是使用 Spark DataFrame API 批式读写 Iceberg 表。

  1. 编写 Spark 代码。

以 Scala 版代码为例,代码示例如下。

package com.bytedance
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object IcebergSparkScalaExample {
  def main(args: Array[String]): Unit = {
    // 配置使用数据湖元数据。
    val sparkConf = new SparkConf()
    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.iceberg.type", "hadoop")
    sparkConf.set("spark.sql.catalog.iceberg.warehouse", "/warehouse/tablespace/managed/hive")
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergSparkScalaExample")
      .getOrCreate()
    // 从DataFrame中创建或替换Iceberg表
    val df1 = spark.createDataFrame(
        Seq((1, "ZhangSan", 20),
            (2, "LiSi", 25),
            (3, "WangWu", 30))
      )
      .toDF("id", "name", "age")
    df1.writeTo("iceberg.iceberg_db.sample").createOrReplace()
    // 读Iceberg表
    spark.table("iceberg.iceberg_db.sample").show()
    // 将DataFrame写入Iceberg表
    val df2 = spark.createDataFrame(Seq((4, "LiLei", 28), (5, "XiaoMing", 22)))
      .toDF("id", "name", "age")
    df2.writeTo("iceberg.iceberg_db.sample").append()
    // 读Iceberg表
    spark.table("iceberg.iceberg_db.sample").show()
  }
}
  1. 打包程序并部署到 EMR 集群。
    1. 检查编译 Scala 代码的 Maven 插件,可以在 pom.xml 中配置如下插件。
<build>
    <plugins>
        <!-- the Maven Scala plugin will compile Scala source files -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
mvn clean package
  1. 通过spark-submit命令运行Spark作业。
spark-submit --class com.bytedance.IcebergSparkScalaExample  iceberg-spark-example-1.0.jar

说明

class 名字和 JAR 包,需根据自己代码工程修改。上述的 iceberg-spark-example-1.0.jar 就是根据代码工程打出的 JA R包。

运行结果如下:

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  4|   LiLei| 28|
|  1|ZhangSan| 20|
|  5|XiaoMing| 22|
|  2|    LiSi| 25|
|  3|  WangWu| 30|
+---+--------+---+