You need to enable JavaScript to run this app.
湖仓一体分析服务 LAS 私有化

湖仓一体分析服务 LAS 私有化

复制全文
高阶使用
Spark批式读写Iceberg
复制全文
Spark批式读写Iceberg

Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。本文以 Spark 3.x 操作Iceberg表为例,介绍如何通过 Spark API 以批处理的方式读写 Iceberg 表。

操作步骤

  1. 新建 Maven 项目并引入 pom 依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.5.1</version>
    <scope>provided</scope>
</dependency>

说明

Spark 组件和 Iceberg 组件的版本信息,需参考 EMR 服务中该组件对应的版本信息。

  1. 配置 Catalog:

Spark 3.x 写数据到 Iceberg 表,V1 DataFrame API 已不推荐使用,建议采用 DataFrameWriterV2 API。以下代码以 V2 API 写入表名为 sample 的 iceberg 表为例。
首先需要配置 Catalog,在 SparkConf 中加入必要配置项即可。

sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.<yourCatalogName>", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.<yourCatalogName>.type", "hadoop")
sparkConf.set("spark.sql.catalog.<yourCatalogName>.warehouse", "hdfs://las-master-1:8020/user/hive/warehouse/iceberg/hive")

说明

  • 示例中的<yourCatalogName>为 Catalog 的名称,请根据实际情况修改 Catalog 名称。
  • 示例中 warehouse 填写 HDFS 的路径,需根据实际情况修改。
  1. 创建表:
val dataFrame = spark.createDataFrame(Seq((1, "ZhangSan", 20))).toDF("id", "name", "age")
dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").create()
  1. 追加数据:
val dataFrame = spark.createDataFrame(Seq((2, "LiSi", 20))).toDF("id", "name", "age")
dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").append()
  1. 覆盖数据:
val dataFrame = spark.createDataFrame(Seq((3, "WangWu", 20))).toDF("id", "name", "age")
dataFrame.writeTo("iceberg.iceberg_db.iceberg_001").overwritePartitions()
  1. 查询数据:
val dataFrame = spark.table("iceberg.iceberg_db.iceberg_001")

完整示例

本示例是使用 Spark DataFrame API 批式读写 Iceberg 表。

  1. 编写 Spark 代码。

以 Scala 版代码为例,代码示例如下。

package com.bytedance
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object IcebergSparkScalaExample {
  def main(args: Array[String]): Unit = {
    // 配置使用数据湖元数据。
    val sparkConf = new SparkConf()
    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.iceberg.type", "hadoop")
    sparkConf.set("spark.sql.catalog.iceberg.warehouse", "/warehouse/tablespace/managed/hive")
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergSparkScalaExample")
      .getOrCreate()
    // 从DataFrame中创建或替换Iceberg表
    val df1 = spark.createDataFrame(
        Seq((1, "ZhangSan", 20),
            (2, "LiSi", 25),
            (3, "WangWu", 30))
      )
      .toDF("id", "name", "age")
    df1.writeTo("iceberg.iceberg_db.sample").createOrReplace()
    // 读Iceberg表
    spark.table("iceberg.iceberg_db.sample").show()
    // 将DataFrame写入Iceberg表
    val df2 = spark.createDataFrame(Seq((4, "LiLei", 28), (5, "XiaoMing", 22)))
      .toDF("id", "name", "age")
    df2.writeTo("iceberg.iceberg_db.sample").append()
    // 读Iceberg表
    spark.table("iceberg.iceberg_db.sample").show()
  }
}
  1. 打包程序并部署到 EMR 集群。
    1. 检查编译 Scala 代码的 Maven 插件,可以在 pom.xml 中配置如下插件。
<build>
    <plugins>
        <!-- the Maven Scala plugin will compile Scala source files -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
mvn clean package
  1. 通过spark-submit命令运行Spark作业。
spark-submit --class com.bytedance.IcebergSparkScalaExample  iceberg-spark-example-1.0.jar

说明

class 名字和 JAR 包,需根据自己代码工程修改。上述的 iceberg-spark-example-1.0.jar 就是根据代码工程打出的 JA R包。

运行结果如下:

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  4|   LiLei| 28|
|  1|ZhangSan| 20|
|  5|XiaoMing| 22|
|  2|    LiSi| 25|
|  3|  WangWu| 30|
+---+--------+---+
最近更新时间:2025.04.01 20:13:42
这个页面对您有帮助吗?
有用
有用
无用
无用