You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

基于Spark Structured Streaming 2.3的无水印双流Join实现咨询

基于Spark Structured Streaming 2.3的双流Socket Join实现(无水印)

我最近用Spark Structured Streaming 2.3完成了一个基于id字段的双流关联逻辑,数据源采用Socket流,而且暂时没有使用水印机制。下面是具体的实现细节和代码:

实现逻辑

  • 从两个不同的Socket端口读取流式数据
  • 解析每条输入的逗号分隔字符串,提取出name/detailid、时间戳字段
  • id为关联键,将两个流进行Inner Join
  • 把关联后的结果输出到控制台

完整代码实现

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, TimestampType}
import org.apache.spark.sql.functions._

// 初始化SparkSession
val spark = SparkSession.builder()
  .appName("SocketStreamJoin")
  .master("local[*]") // 本地调试用,生产环境请移除
  .getOrCreate()

import spark.implicits._

// 第一个Socket流的读取与解析
val df1 = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "5431")
  .load()

val parsedStream1 = df1.as[String]
  .map(_.split(","))
  .select(
    $"value"(0).as("name"),
    $"value"(1).cast(IntegerType).as("id"),
    $"value"(2).cast(TimestampType).as("ts")
  )

// 第二个Socket流的读取与解析
val df1_1 = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "5432") // 用不同端口区分第二个流
  .load()

val parsedStream2 = df1_1.as[String]
  .map(_.split(","))
  .select(
    $"value"(0).as("detail"),
    $"value"(1).cast(IntegerType).as("id"),
    $"value"(2).cast(TimestampType).as("event_ts")
  )

// 基于id字段执行双流Inner Join
val joinedStream = parsedStream1.join(parsedStream2, Seq("id"), "inner")

// 配置输出并启动流查询
val query = joinedStream.writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", "false")
  .start()

query.awaitTermination()

注意事项

  • 测试时需要在本地启动两个Socket服务(比如用nc -lk 5431nc -lk 5432),输入格式为逗号分隔的字符串,例如:
    • 第一个流输入:Alice,1,2024-05-20 10:00:00
    • 第二个流输入:PremiumUser,1,2024-05-20 10:00:01
  • 由于未使用水印机制,Spark会持续维护所有历史数据的关联状态,长时间运行可能导致内存占用过高。如果是生产环境,建议根据业务的事件时间窗口添加水印(比如withWatermark)来自动清理过期状态,避免OOM问题。

内容的提问来源于stack exchange,提问作者Tyrion Lannister

火山引擎 最新活动