You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

SparkAppHandle Listener未触发:Spark 2.3提交K8s集群异常

解决Spark 2.3 on K8s任务提交后Listener未触发及AppId获取问题

我之前在做Spark on K8s的任务提交时也踩过类似的坑,结合你的情况,给你梳理几个关键的解决方案:

一、为什么stateChangedinfoChanged没触发?

这大概率是监听逻辑的部署位置不对,或者客户端生命周期的问题:

  1. 集群模式下,客户端Listener无法接收Driver端事件
    你用cluster部署模式时,Spark Driver是运行在K8s的Pod里的,而本地提交程序里的SparkAppHandle.Listener是跑在提交客户端进程中。客户端在提交完任务后如果直接退出,就接收不到后续的状态变更事件;就算客户端保持运行,K8s模式下Spark Launcher的客户端监听依赖于K8s API的状态同步,也可能存在延迟或者权限问题。

  2. Listener注册方式错误
    如果你的Listener是想监听Driver端的任务运行状态(比如应用启动、结束、Executor变化),应该把Listener打包到Driver的Jar包中,通过Spark配置指定在Driver端加载:

    spark-submit \
      --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
      --deploy-mode cluster \
      --conf spark.extraListeners=com.your.package.YourCustomListener \
      # 其他配置...
    

    这样Listener会运行在Driver进程内,能准确捕获所有Spark事件。

  3. 方法名是否正确?
    注意SparkListener的标准方法名是onApplicationStateChange,而不是stateChanged——如果你是自己实现Listener,别把方法名写错了!比如正确的重写应该是:

    import org.apache.spark.scheduler.SparkListener
    import org.apache.spark.scheduler.SparkListenerApplicationState
    
    class YourCustomListener extends SparkListener {
      override def onApplicationStateChange(state: SparkListenerApplicationState): Unit = {
        println(s"Driver端捕获到应用状态变更: ${state.state}")
      }
    
      override def onApplicationStart(startEvent: SparkListenerApplicationStart): Unit = {
        println(s"应用启动,AppId: ${startEvent.appId}")
      }
    }
    

二、如何获取handle.getAppId

在K8s集群模式下,AppId需要等Driver Pod启动并完成Spark初始化后才会生成,所以不能提交后立即获取,得通过监听或者轮询的方式:

  1. 利用SparkAppHandle.ListenerstateChanged事件
    你可以在提交客户端的Listener中,监听状态变化,当状态变为RUNNING时,AppId通常就已经生成了:

    val handle = launcher.startApplication(new SparkAppHandle.Listener {
      override def stateChanged(handle: SparkAppHandle): Unit = {
        println(s"客户端捕获到应用状态: ${handle.getState}")
        handle.getAppId match {
          case Some(appId) => println(s"成功获取AppId: $appId")
          case None => println("AppId尚未生成")
        }
      }
    
      override def infoChanged(handle: SparkAppHandle): Unit = {
        // 这里也可以尝试获取AppId
        handle.getAppId.foreach(id => println(s"Info变更时获取AppId: $id"))
      }
    })
    
    // 保持客户端进程运行,直到任务结束
    while (!handle.getState.isFinal) {
      Thread.sleep(1000)
    }
    
  2. 通过K8s API查询(备选方案)
    如果客户端监听的方式不好用,你可以通过K8s的标签查询Driver Pod,Pod的spark-app-selector标签值就是Spark的AppId:

    kubectl get pods -l spark-role=driver -o jsonpath='{.items[0].metadata.labels.spark-app-selector}'
    

    当然,这需要你的提交客户端有K8s的访问权限,并且知道应用名称来过滤。

三、额外注意事项

  • Spark 2.3的K8s支持是Beta版本,存在一些已知的稳定性问题,如果可能的话,升级到Spark 2.4或更高版本会减少这类奇怪的问题。
  • 确保提交客户端能访问K8s API Server,并且有足够的权限(比如能查询Pod状态、读取标签),可以通过设置KUBECONFIG环境变量指定配置文件。
  • 如果用Play框架提交,要注意Play的异步线程模型,别让提交任务的线程被提前销毁,导致监听中断。

内容的提问来源于stack exchange,提问作者shiv455

火山引擎 最新活动