You need to enable JavaScript to run this app.
导航
Spark流式读写 Iceberg
最近更新时间:2025.04.01 20:13:42首次发布时间:2025.03.29 22:16:13
我的收藏
有用
有用
无用
无用

接下来介绍 Spark 3.x 操作 Iceberg 表为例介绍如何通过 Spark Structured Streaming 流式读写 Iceberg 表。

操作步骤

  1. 新建 Maven 项目并引入 pom 依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.5.1</version>
    <scope>provided</scope>
</dependency>
  1. 流式写入

Spark Structured Streaming 通过 DataStreamWriter 接口流式写数据到 Iceberg 表,代码如下。

val tableIdentifier: String = "iceberg.iceberg_db.streamingtable"
val checkpointPath: String = "/tmp/iceberg_checkpointPath"
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()

说明

代码中的 tableIdentifier 是元数据表名或者表路径。checkpointPath 是 spark 流数据处理程序使用的 checkpoint 地址。流式写入支持以下两种方式:
append:追加每个批次的数据到 Iceberg 表,相当于insert into。
complete:使用最新批次的数据完全覆盖 Iceberg,相当于 insert overwrite。

  1. 流式读取
val df = spark.readStream
    .format("iceberg")

    .option("stream-from-timestamp", Long.toString(streamStartTimestamp))
    .load("database.table_name")

完整示例

本示例上采用 linux 的 netcat 命令发送数据,Spark 接收数据后写入 Iceberg 表中。

  1. 编写 Spark 代码。

以 Scala 版代码为例,代码示例如下。

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object IcebergSparkStreamingScalaExample {
  def main(args: Array[String]): Unit = {
    // 配置使用数据湖元数据。
    val sparkConf = new SparkConf()
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergSparkStreamingScalaExample")
      .getOrCreate()
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))
    val checkpointPath = "/tmp/iceberg_checkpointPath"
    // 流式写入Iceberg表
    val query = words.toDF().writeStream
      .format("iceberg")
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .option("checkpointLocation", checkpointPath)
      .option("path", "iceberg.iceberg_db.streamingtable")
      .start()
    query.awaitTermination()
  }
}
  1. 打包程序并部署到LAS集群。
    1. 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
<build>
    <plugins>
        <!-- the Maven Scala plugin will compile Scala source files -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
mvn clean package
  1. 通过 Spark SQL 创建测试使用的数据库 iceberg_db 和表 streamingtable,详细操作请参见 基础使用。也可以直接输入下面命令:
spark-sql --master yarn \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.iceberg.type=hadoop \
  --conf spark.sql.catalog.iceberg.warehouse=/warehouse/tablespace/managed/hive

接下来,在 Spark SQL 控制台中执行下面的sql语句:

CREATE DATABASE IF NOT EXISTS iceberg_db;
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.streamingtable(value STRING) USING iceberg;
  1. 通过 Linux 的 netcat 命令准备一些数据
netcat -lk -p 9999

并输入一些字符串。

  1. 通过spark-submit命令运行Spark作业
spark-submit --class com.bytedance.IcebergSparkStreamingScalaExample iceberg-spark-example-1.0.jar

说明

class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark-example-1.0.jar就是根据代码工程打出的JAR包。

  1. 通过 Spark SQL 查看Iceberg表的数据运行结果如下:
spark-sql --master yarn \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.iceberg.type=hadoop \
  --conf spark.sql.catalog.iceberg.warehouse=/warehouse/tablespace/managed/hive

在 Spark SQL 控制台中执行下面的sql语句:

SELECT * FROM iceberg.iceberg_db.streamingtable;