Spark流式读写 Iceberg(适用于EMR 2.x版本)
最近更新时间:2023.02.22 17:29:37首次发布时间:2022.09.22 16:55:46

本文以 Spark 2.x 操作 Iceberg 表为例介绍如何通过 Spark Structured Streaming 流式读写 Iceberg 表。

1 前提条件

  1. 适合 E-MapReduce(EMR) 2.x 的版本

  2. 已创建 EMR 集群,且安装有 Iceberg 组件。有两种方式可以安装 Iceberg 组件:

    1. 在创建 EMR 集群时,选择 Icerberg 作为可选组件,详见:创建集群

    2. 对已安装 EMR 集群,参考 服务管理章节 添加 Iceberg 服务。

2 操作步骤

  1. 新建 Maven 项目并引入 pom依赖:
  1. 流式写入

Spark Structured Streaming 通过 DataStreamWriter 接口流式写数据到 Iceberg 表,代码如下。

val name = TableIdentifier.of("default","spark2_streaming_demo")
val tableIdentifier = name.toString
val checkpointPath: String = "/tmp/iceberg_checkpointPath"
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)


代码中的 tableIdentifier 是元数据表名或者表路径。checkpointPath 是 spark 流数据处理程序使用的checkpoint地址。流式写入支持以下两种方式:

  • append:追加每个批次的数据到Iceberg表,相当于insert into。

  • complete:使用最新批次的数据完全覆盖Iceberg,相当于insert overwrite。

  1. 流式读取
val df = spark.readStream
    .option("stream-from-timestamp", Long.toString(streamStartTimestamp))

3 完整示例

本示例上采用 linux 的 netcat 命令发送数据,Spark 接收数据后写入 Iceberg 表中。

  1. 编写Spark代码。


import org.apache.iceberg.Schema
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.types.Types
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object IcebergSpark2StreamingScalaExample {
  def main(args: Array[String]): Unit = {
    // 配置使用数据湖元数据。
    val sparkConf = new SparkConf()
    val spark = SparkSession
    import spark.implicits._

    val name = TableIdentifier.of("default","spark2_streaming_demo")
    val tableName = name.toString
    val warehouseLocation = "/warehouse/tablespace/managed/hive"
    val catalog = new HiveCatalog()
    val properties = new util.HashMap[String, String]
    properties.put("warehouse", "/user/hive/warehouse/iceberg/hive")
    properties.put("uri", "thrift://emr-master-1:9083")
    catalog.initialize("hive", properties)
    val schema = new Schema(
      Types.NestedField.optional(1, "value", Types.StringType.get()))
    try {
      // 创建 Iceberg 表
      catalog.createTable(name, schema)
    } catch {
      case _: org.apache.iceberg.exceptions.AlreadyExistsException =>

    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
      .option("host", "localhost")
      .option("port", 9999)

    // Split the lines into words
    val words =[String].flatMap(_.split(" "))

    val checkpointPath = "/tmp/iceberg_checkpointPath"
    // 流式写入Iceberg表
    val query = words.toDF().writeStream
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .option("checkpointLocation", checkpointPath)
      .option("path", tableName)

  1. 打包程序并部署到EMR集群。

    1. 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
            <!-- the Maven Scala plugin will compile Scala source files -->
    1. 执行编译命令
    mvn clean package
    1. 将生成的jar包上传到EMR集群上
  2. 通过 Linux 的 netcat 命令准备一些数据

    netcat -lk -p 9999


  3. 通过 spark-submit 命令运行 Spark 作业

    spark-submit --class com.bytedance.IcebergSpark2StreamingScalaExample iceberg-spark2-example-1.0.jar


    class 名字和 JAR 包,需根据自己代码工程修改。上述的 iceberg-spark2-example-1.0.jar 就是根据代码工程打出的JAR包。

  4. 通过 spark-shell 查看 Iceberg 表的数据运行结果如下

    spark-shell --master yarn

    在 spark-shell 控制台中执行下面的代码:

    import org.apache.iceberg.catalog.TableIdentifier
    val name = TableIdentifier.of("default","spark2_streaming_demo")
    val tableName = name.toString"iceberg").load(tableName).show()
