基于Spark Structured Streaming 2.3的无水印双流Join实现咨询
基于Spark Structured Streaming 2.3的双流Socket Join实现(无水印)
我最近用Spark Structured Streaming 2.3完成了一个基于id字段的双流关联逻辑,数据源采用Socket流,而且暂时没有使用水印机制。下面是具体的实现细节和代码:
实现逻辑
- 从两个不同的Socket端口读取流式数据
- 解析每条输入的逗号分隔字符串,提取出
name/detail、id、时间戳字段 - 以
id为关联键,将两个流进行Inner Join - 把关联后的结果输出到控制台
完整代码实现
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, TimestampType} import org.apache.spark.sql.functions._ // 初始化SparkSession val spark = SparkSession.builder() .appName("SocketStreamJoin") .master("local[*]") // 本地调试用,生产环境请移除 .getOrCreate() import spark.implicits._ // 第一个Socket流的读取与解析 val df1 = spark.readStream .format("socket") .option("host", "localhost") .option("port", "5431") .load() val parsedStream1 = df1.as[String] .map(_.split(",")) .select( $"value"(0).as("name"), $"value"(1).cast(IntegerType).as("id"), $"value"(2).cast(TimestampType).as("ts") ) // 第二个Socket流的读取与解析 val df1_1 = spark.readStream .format("socket") .option("host", "localhost") .option("port", "5432") // 用不同端口区分第二个流 .load() val parsedStream2 = df1_1.as[String] .map(_.split(",")) .select( $"value"(0).as("detail"), $"value"(1).cast(IntegerType).as("id"), $"value"(2).cast(TimestampType).as("event_ts") ) // 基于id字段执行双流Inner Join val joinedStream = parsedStream1.join(parsedStream2, Seq("id"), "inner") // 配置输出并启动流查询 val query = joinedStream.writeStream .outputMode("append") .format("console") .option("truncate", "false") .start() query.awaitTermination()
注意事项
- 测试时需要在本地启动两个Socket服务(比如用
nc -lk 5431和nc -lk 5432),输入格式为逗号分隔的字符串,例如:- 第一个流输入:
Alice,1,2024-05-20 10:00:00 - 第二个流输入:
PremiumUser,1,2024-05-20 10:00:01
- 第一个流输入:
- 由于未使用水印机制,Spark会持续维护所有历史数据的关联状态,长时间运行可能导致内存占用过高。如果是生产环境,建议根据业务的事件时间窗口添加水印(比如
withWatermark)来自动清理过期状态,避免OOM问题。
内容的提问来源于stack exchange,提问作者Tyrion Lannister




