Spark Scala调用REST API获取JSON响应时HDFS环境下超时异常求助
看起来你遇到的核心问题是:本地运行正常的HTTP请求代码,部署到Spark集群后超时设置未生效,始终使用默认的100ms超时。结合你的日志和代码,我来拆解问题并给出具体解决方案:
首先纠正一个关键误区
你设置的spark.network.timeout、spark.executor.heartbeatInterval等配置是Spark集群内部节点间的通信超时,和你调用外部REST API的HTTP请求完全无关,这些配置可以移除,不会影响HTTP调用的超时逻辑。
问题根源分析
从日志显示的connect timeout: Period{time=100, timeUnit=MILLISECONDS}来看,你的代码中设置的超大超时值并没有真正作用到HTTP客户端上。可能的原因包括:
- 集群JVM的全局网络属性覆盖了代码中的设置
- 你使用的HTTP客户端(Scalaj/URLConnection)在Spark集群类加载环境下存在配置生效问题
- 集群节点与API服务器之间存在网络限制(防火墙/代理),导致根本无法建立连接
针对两种实现的具体解决方案
1. Scalaj HTTP 实现优化
Scalaj HTTP在某些环境下可能会依赖JDK内置的HttpClient,而JDK HttpClient的超时会被系统属性覆盖。可以通过两种方式修复:
方式一:显式配置JVM全局超时参数
在提交Spark作业时,给driver和executor添加JVM参数,强制设置HTTP超时:
spark-submit \ --conf "spark.driver.extraJavaOptions=-Djdk.httpclient.connectTimeout=300000 -Djdk.httpclient.readTimeout=300000" \ --conf "spark.executor.extraJavaOptions=-Djdk.httpclient.connectTimeout=300000 -Djdk.httpclient.readTimeout=300000" \ # 其他作业参数...
(这里设置为5分钟,不要用999999999这种超大值,可能触发数值溢出问题)
方式二:升级Scalaj HTTP并显式配置客户端
确保使用最新版的Scalaj HTTP(比如2.4.2+),并显式传递超时选项:
System.setProperty("sun.net.http.allowRestrictedHeaders", "true") val spark = Context.getSparkSession() import spark.implicits._ val result = Http(Url) .auth("xxxx","yyyy") .options( HttpOptions.connTimeout(300000), // 5分钟连接超时 HttpOptions.readTimeout(300000) // 5分钟读取超时 ) .asString
2. scala.io.Source(URLConnection)实现优化
URLConnection的超时设置容易被JVM全局属性覆盖,同时你的代码存在流重复调用的问题(getInputStream被调用两次,第二次会返回null),需要同时修复:
方式一:设置JVM全局网络超时属性
在代码开头添加:
// 全局设置默认连接/读取超时为5分钟 System.setProperty("sun.net.client.defaultConnectTimeout", "300000") System.setProperty("sun.net.client.defaultReadTimeout", "300000")
或者在spark-submit时传递这些参数(和Scalaj的方式一致)。
方式二:修复代码中的流操作问题
修改你的GetUrlContentJson方法,避免重复获取输入流:
@throws(classOf[java.io.IOException]) @throws(classOf[java.net.SocketTimeoutException]) def GetUrlContentJson(url: String)(implicit spark: SparkSession): DataFrame ={ val userpass = "xxxx" + ":" + "yyyy"; val basicAuth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(userpass.getBytes()); val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection] connection.setRequestProperty("Authorization", basicAuth) connection.setConnectTimeout(300000) connection.setReadTimeout(300000) connection.setUseCaches(false) var inputStream: InputStream = null try { inputStream = connection.getInputStream val result = scala.io.Source.fromInputStream(inputStream).mkString // 将JSON字符串转换为DataFrame spark.read.json(Seq(result).toDS()) } finally { // 确保流和连接被正确关闭 if (inputStream != null) inputStream.close() connection.disconnect() } }
额外排查方向
如果以上配置后仍然超时,需要确认集群的网络环境:
- 连通性检查:在Spark集群的任意节点上,手动执行
curl -u xxxx:yyyy <API_URL>,确认是否能正常获取响应。如果无法连通,说明存在防火墙/网络策略限制,需要联系运维解决。 - 代理设置:如果集群需要通过代理访问外部网络,需要在代码中添加代理配置:
或者Scalaj HTTP设置代理:// 针对URLConnection设置代理 val proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxy-host", 8080)) val connection = new URL(url).openConnection(proxy)Http(Url).proxy("proxy-host", 8080).auth("xxxx","yyyy")... - 更换更可靠的HTTP客户端:推荐使用Apache HttpClient,它的配置更稳定,在Spark集群环境下兼容性更好:
import org.apache.http.client.methods.HttpGet import org.apache.http.impl.client.HttpClients import org.apache.http.util.EntityUtils def getApiData(url: String, user: String, pass: String)(implicit spark: SparkSession): DataFrame = { val basicAuth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(s"$user:$pass".getBytes()) val httpClient = HttpClients.createDefault() val requestConfig = org.apache.http.client.config.RequestConfig.custom() .setConnectTimeout(300000) .setSocketTimeout(300000) .build() val httpGet = new HttpGet(url) httpGet.addHeader("Authorization", basicAuth) httpGet.setConfig(requestConfig) val response = httpClient.execute(httpGet) try { val result = EntityUtils.toString(response.getEntity) spark.read.json(Seq(result).toDS()) } finally { response.close() httpClient.close() } }
内容的提问来源于stack exchange,提问作者Ayush




