Spark 2.3.0结构化流进程的正确退出方式咨询
Spark 2.3.0 结构化流进程的正确退出方式
嘿,结合你提到的《Spark权威指南》里的JSON流示例(读取文件流、检查内存表5次后退出),我来分享Spark 2.3.0里几种靠谱的结构化流进程退出方法:
1. 针对示例场景:基于查询次数主动停止
你的需求是检查内存表5次后退出,这种场景下可以在启动流查询后,通过循环查询内存表并计数,达到次数后主动停止流进程:
// 先完成流查询的定义和启动 val streaming = spark.readStream .schema(dataSchema) .option("maxFilesPerTrigger", 1) .json("/data/activity-data/") val activityCounts = streaming.groupBy("gt").count() val activityQuery = activityCounts.writeStream .queryName("activity_counts") .option("checkpointLocation", "/path/to/your/checkpoint") // 务必设置checkpoint .format("memory") // 对应示例的内存表输出 .start() // 循环检查内存表,5次后停止 var checkCount = 0 while (checkCount < 5) { // 查询并展示内存表数据 spark.sql("SELECT * FROM activity_counts").show() // 等待2秒让流处理下一批文件 Thread.sleep(2000) checkCount += 1 } // 主动停止流查询 activityQuery.stop() // 阻塞直到查询完全终止,避免资源泄漏 activityQuery.awaitTermination() // 最后关闭SparkSession释放所有资源 spark.stop()
这里的核心是stop()方法,它会触发流的优雅停止:完成当前批次的处理、关闭数据源和输出端、更新checkpoint,之后awaitTermination()会确保进程完全终止后再继续执行后续代码。
2. 通用场景:基于超时自动停止
如果你的流需要运行固定时长后退出,可以用awaitTermination(timeoutMs)设置超时时间,超时后手动停止:
activityQuery.start() // 让流运行10分钟(单位:毫秒) val isTimeout = activityQuery.awaitTermination(10 * 60 * 1000) if (isTimeout) { // 超时后主动停止流 activityQuery.stop() spark.stop() }
注意:awaitTermination(timeoutMs)在超时后会返回false,这时候必须手动调用stop()来确保流进程完全退出。
3. 生产环境:基于外部信号优雅终止
在生产环境中,通常会通过外部信号(比如SIGTERM)来终止进程。Spark 2.3.0的结构化流支持优雅处理这类信号:
- 当收到
SIGTERM时,流会先完成当前正在处理的批次,然后关闭所有数据源、输出连接,最后更新checkpoint并退出。 - 一定要避免使用
SIGKILL(强制杀死进程),这会导致未完成的数据丢失,甚至损坏checkpoint,影响后续重启。
重要注意事项
- 必须设置checkpointLocation:这不仅是容错的关键,也能确保流停止后重启时,从上次处理的位置继续消费,避免重复或丢失数据。
- 停止流后记得调用
spark.stop():彻底关闭SparkSession,释放集群中的Executor资源。
内容的提问来源于stack exchange,提问作者Eddie Ho




