You need to enable JavaScript to run this app.
湖仓一体分析服务 LAS 私有化

湖仓一体分析服务 LAS 私有化

复制全文
高阶使用
Spark流式读写 Iceberg
复制全文
Spark流式读写 Iceberg

接下来介绍 Spark 3.x 操作 Iceberg 表为例介绍如何通过 Spark Structured Streaming 流式读写 Iceberg 表。

操作步骤

  1. 新建 Maven 项目并引入 pom 依赖:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.5.1</version>
    <scope>provided</scope>
</dependency>
  1. 流式写入

Spark Structured Streaming 通过 DataStreamWriter 接口流式写数据到 Iceberg 表,代码如下。

val tableIdentifier: String = "iceberg.iceberg_db.streamingtable"
val checkpointPath: String = "/tmp/iceberg_checkpointPath"
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()

说明

代码中的 tableIdentifier 是元数据表名或者表路径。checkpointPath 是 spark 流数据处理程序使用的 checkpoint 地址。流式写入支持以下两种方式:
append:追加每个批次的数据到 Iceberg 表,相当于insert into。
complete:使用最新批次的数据完全覆盖 Iceberg,相当于 insert overwrite。

  1. 流式读取
val df = spark.readStream
    .format("iceberg")

    .option("stream-from-timestamp", Long.toString(streamStartTimestamp))
    .load("database.table_name")

完整示例

本示例上采用 linux 的 netcat 命令发送数据,Spark 接收数据后写入 Iceberg 表中。

  1. 编写 Spark 代码。

以 Scala 版代码为例,代码示例如下。

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object IcebergSparkStreamingScalaExample {
  def main(args: Array[String]): Unit = {
    // 配置使用数据湖元数据。
    val sparkConf = new SparkConf()
    val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergSparkStreamingScalaExample")
      .getOrCreate()
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))
    val checkpointPath = "/tmp/iceberg_checkpointPath"
    // 流式写入Iceberg表
    val query = words.toDF().writeStream
      .format("iceberg")
      .outputMode("append")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .option("checkpointLocation", checkpointPath)
      .option("path", "iceberg.iceberg_db.streamingtable")
      .start()
    query.awaitTermination()
  }
}
  1. 打包程序并部署到LAS集群。
    1. 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
<build>
    <plugins>
        <!-- the Maven Scala plugin will compile Scala source files -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
mvn clean package
  1. 通过 Spark SQL 创建测试使用的数据库 iceberg_db 和表 streamingtable,详细操作请参见 基础使用。也可以直接输入下面命令:
spark-sql --master yarn \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.iceberg.type=hadoop \
  --conf spark.sql.catalog.iceberg.warehouse=/warehouse/tablespace/managed/hive

接下来,在 Spark SQL 控制台中执行下面的sql语句:

CREATE DATABASE IF NOT EXISTS iceberg_db;
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.streamingtable(value STRING) USING iceberg;
  1. 通过 Linux 的 netcat 命令准备一些数据
netcat -lk -p 9999

并输入一些字符串。

  1. 通过spark-submit命令运行Spark作业
spark-submit --class com.bytedance.IcebergSparkStreamingScalaExample iceberg-spark-example-1.0.jar

说明

class名字和JAR包,需根据自己代码工程修改。上述的iceberg-spark-example-1.0.jar就是根据代码工程打出的JAR包。

  1. 通过 Spark SQL 查看Iceberg表的数据运行结果如下:
spark-sql --master yarn \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.iceberg.type=hadoop \
  --conf spark.sql.catalog.iceberg.warehouse=/warehouse/tablespace/managed/hive

在 Spark SQL 控制台中执行下面的sql语句:

SELECT * FROM iceberg.iceberg_db.streamingtable;
最近更新时间:2025.04.01 20:13:42
这个页面对您有帮助吗?
有用
有用
无用
无用