You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark Scala调用REST API获取JSON响应时HDFS环境下超时异常求助

解决Spark Scala集群环境下调用REST API超时问题

看起来你遇到的核心问题是:本地运行正常的HTTP请求代码,部署到Spark集群后超时设置未生效,始终使用默认的100ms超时。结合你的日志和代码,我来拆解问题并给出具体解决方案:

首先纠正一个关键误区

你设置的spark.network.timeoutspark.executor.heartbeatInterval等配置是Spark集群内部节点间的通信超时,和你调用外部REST API的HTTP请求完全无关,这些配置可以移除,不会影响HTTP调用的超时逻辑。

问题根源分析

从日志显示的connect timeout: Period{time=100, timeUnit=MILLISECONDS}来看,你的代码中设置的超大超时值并没有真正作用到HTTP客户端上。可能的原因包括:

  1. 集群JVM的全局网络属性覆盖了代码中的设置
  2. 你使用的HTTP客户端(Scalaj/URLConnection)在Spark集群类加载环境下存在配置生效问题
  3. 集群节点与API服务器之间存在网络限制(防火墙/代理),导致根本无法建立连接

针对两种实现的具体解决方案

1. Scalaj HTTP 实现优化

Scalaj HTTP在某些环境下可能会依赖JDK内置的HttpClient,而JDK HttpClient的超时会被系统属性覆盖。可以通过两种方式修复:

方式一:显式配置JVM全局超时参数

在提交Spark作业时,给driver和executor添加JVM参数,强制设置HTTP超时:

spark-submit \
  --conf "spark.driver.extraJavaOptions=-Djdk.httpclient.connectTimeout=300000 -Djdk.httpclient.readTimeout=300000" \
  --conf "spark.executor.extraJavaOptions=-Djdk.httpclient.connectTimeout=300000 -Djdk.httpclient.readTimeout=300000" \
  # 其他作业参数...

(这里设置为5分钟,不要用999999999这种超大值,可能触发数值溢出问题)

方式二:升级Scalaj HTTP并显式配置客户端

确保使用最新版的Scalaj HTTP(比如2.4.2+),并显式传递超时选项:

System.setProperty("sun.net.http.allowRestrictedHeaders", "true")
val spark = Context.getSparkSession()
import spark.implicits._

val result = Http(Url)
  .auth("xxxx","yyyy")
  .options(
    HttpOptions.connTimeout(300000),  // 5分钟连接超时
    HttpOptions.readTimeout(300000)   // 5分钟读取超时
  )
  .asString

2. scala.io.Source(URLConnection)实现优化

URLConnection的超时设置容易被JVM全局属性覆盖,同时你的代码存在流重复调用的问题(getInputStream被调用两次,第二次会返回null),需要同时修复:

方式一:设置JVM全局网络超时属性

在代码开头添加:

// 全局设置默认连接/读取超时为5分钟
System.setProperty("sun.net.client.defaultConnectTimeout", "300000")
System.setProperty("sun.net.client.defaultReadTimeout", "300000")

或者在spark-submit时传递这些参数(和Scalaj的方式一致)。

方式二:修复代码中的流操作问题

修改你的GetUrlContentJson方法,避免重复获取输入流:

@throws(classOf[java.io.IOException])
@throws(classOf[java.net.SocketTimeoutException])
def GetUrlContentJson(url: String)(implicit spark: SparkSession): DataFrame ={
  val userpass = "xxxx" + ":" + "yyyy";
  val basicAuth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(userpass.getBytes());
  val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection]
  
  connection.setRequestProperty("Authorization", basicAuth)
  connection.setConnectTimeout(300000)
  connection.setReadTimeout(300000)
  connection.setUseCaches(false)
  
  var inputStream: InputStream = null
  try {
    inputStream = connection.getInputStream
    val result = scala.io.Source.fromInputStream(inputStream).mkString
    // 将JSON字符串转换为DataFrame
    spark.read.json(Seq(result).toDS())
  } finally {
    // 确保流和连接被正确关闭
    if (inputStream != null) inputStream.close()
    connection.disconnect()
  }
}

额外排查方向

如果以上配置后仍然超时,需要确认集群的网络环境:

  • 连通性检查:在Spark集群的任意节点上,手动执行curl -u xxxx:yyyy <API_URL>,确认是否能正常获取响应。如果无法连通,说明存在防火墙/网络策略限制,需要联系运维解决。
  • 代理设置:如果集群需要通过代理访问外部网络,需要在代码中添加代理配置:
    // 针对URLConnection设置代理
    val proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxy-host", 8080))
    val connection = new URL(url).openConnection(proxy)
    
    或者Scalaj HTTP设置代理:
    Http(Url).proxy("proxy-host", 8080).auth("xxxx","yyyy")...
    
  • 更换更可靠的HTTP客户端:推荐使用Apache HttpClient,它的配置更稳定,在Spark集群环境下兼容性更好:
    import org.apache.http.client.methods.HttpGet
    import org.apache.http.impl.client.HttpClients
    import org.apache.http.util.EntityUtils
    
    def getApiData(url: String, user: String, pass: String)(implicit spark: SparkSession): DataFrame = {
      val basicAuth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(s"$user:$pass".getBytes())
      val httpClient = HttpClients.createDefault()
      
      val requestConfig = org.apache.http.client.config.RequestConfig.custom()
        .setConnectTimeout(300000)
        .setSocketTimeout(300000)
        .build()
        
      val httpGet = new HttpGet(url)
      httpGet.addHeader("Authorization", basicAuth)
      httpGet.setConfig(requestConfig)
      
      val response = httpClient.execute(httpGet)
      try {
        val result = EntityUtils.toString(response.getEntity)
        spark.read.json(Seq(result).toDS())
      } finally {
        response.close()
        httpClient.close()
      }
    }
    

内容的提问来源于stack exchange,提问作者Ayush

火山引擎 最新活动