You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Scala Spark在GCP分布式运行时Worker节点负载极低,电影协同过滤推荐系统运行异常求助

Scala Spark在GCP分布式运行时Worker节点负载极低,电影协同过滤推荐系统运行异常求助

嗨,我看了你的问题和代码,马上就发现了几个导致Worker节点几乎没负载、所有活儿都压在Master上的核心问题,咱们一步步拆解解决:

核心问题1:Driver端的collect()+循环彻底阻塞了分布式计算

你这段代码里的致命问题就在这里:

val allUserRecommendations = test
 .select("user_id")
 .distinct()
 .as[String]
 .collect() // 这里把所有用户ID拉到了Master节点的Driver内存里!
 .map(user_id => predict(user_id, test.select("movie_id").where(col("user_id") === user_id), similarity, ratingsDF))
 .reduce((df1, df2) => df1.union(df2))

collect()会把分布式的用户ID数据集全部拉到Driver(也就是Master节点),然后你在Driver上单线程循环处理每个用户的推荐请求,再把结果union起来。这相当于所有的计算逻辑都被限制在Master的单线程里,Worker节点根本没机会参与大规模计算,自然CPU使用率只有2%。

核心问题2:分区数设置严重不足,并行度完全没跟上

你手动设置了repartition(4),但你的集群有两个Worker,每个5核,总共10个可用CPU核心。分区数4远小于核心数,就算有分布式任务,也没法充分利用所有Worker的算力。而且Spark的默认 shuffle 分区数(spark.sql.shuffle.partitions)默认是200,但你手动改成4,会导致shuffle过程中数据过度集中,进一步降低并行效率。

核心问题3:自定义的余弦相似度计算逻辑效率极低,且极易引发数据倾斜

你自己实现的用户相似度计算:

val userProduct = ratingsDF
 .alias("u1")
 .join(ratingsDF.alias("u2"), Seq("movie_id"))
 .where(col("u1.user_id") =!= col("u2.user_id"))

这是一个全量自连接,1亿行的数据集会生成天文数字级别的用户对,不仅计算量爆炸,还会导致严重的数据倾斜(热门电影对应的用户对会远超其他电影),最终要么计算极慢,要么直接OOM。


解决方案:重构代码,用Spark MLlib的ALS算法替代自定义逻辑,充分利用分布式资源

Spark MLlib内置了交替最小二乘法(ALS),这是专门为大规模协同过滤场景优化的分布式算法,能完美适配你的GCP集群,彻底解决Worker闲置的问题。

步骤1:替换自定义相似度计算为ALS算法

ALS会自动处理分布式的矩阵分解,生成用户和物品的隐因子向量,然后高效计算推荐结果,完全不需要你手动处理用户对的相似度。

步骤2:移除Driver端的collect()+循环,改用分布式API处理推荐

用Spark的recommendForAllUsers或者recommendForUserSubset来批量生成推荐,全程分布式执行,Worker节点会被充分利用。

步骤3:调整集群并行度配置

在SparkConf里添加以下配置,让Spark自动适配你的集群资源:

val conf = new SparkConf()
 .setAppName("CollaborativeFilteringDF")
 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
 .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
 .set("spark.sql.shuffle.partitions", "20") // 设置为总核数的2倍(10核*2=20)
 .set("spark.default.parallelism", "20") // 与shuffle分区数对应

重构后的完整代码示例

import com.google.cloud.storage.{BlobId, StorageOptions}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.evaluation.RegressionEvaluator

object CollaborativeFilteringALS {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)

    var bucketName = "bucket"
    var trainName = "train.csv"
    var testName = "test.csv"

    if (args.length > 0) {
      bucketName = args(0)
      if (args.length > 2) {
        trainName = args(1)
        testName = args(2)
      }
    }

    val basePath = s"gs://$bucketName"
    val trainPath = s"$basePath/$trainName"
    val testPath = s"$basePath/$testName"

    val conf = new SparkConf()
     .setAppName("CollaborativeFilteringALS")
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
     .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
     .set("spark.sql.shuffle.partitions", "20") // 适配集群总核数(10核*2)
     .set("spark.default.parallelism", "20")

    val spark = SparkSession.builder.config(conf).getOrCreate()
    import spark.implicits._

    println(s"Read train: $trainName")
    val ratingsDF = spark.read
     .option("header", true)
     .csv(trainPath)
     .select(
        col("user_id").cast("integer").alias("userId"),
        col("movie_id").cast("integer").alias("movieId"),
        col("rating").cast("double").alias("rating")
      )
     .repartition(spark.sparkContext.defaultParallelism) // 自动适配并行度

    println(s"Read test: $testName")
    val testDF = spark.read
     .option("header", true)
     .csv(testPath)
     .select(
        col("user_id").cast("integer").alias("userId"),
        col("movie_id").cast("integer").alias("movieId"),
        col("rating").cast("double").alias("rating")
      )

    val t0 = System.nanoTime()

    // 初始化ALS模型
    val als = new ALS()
     .setMaxIter(10) // 迭代次数,可根据效果调整
     .setRegParam(0.1) // 正则化参数,防止过拟合
     .setUserCol("userId")
     .setItemCol("movieId")
     .setRatingCol("rating")
     .setColdStartStrategy("drop") // 忽略冷启动的用户/物品

    // 训练模型
    val model = als.fit(ratingsDF)

    // 生成测试集的预测结果
    val predictions = model.transform(testDF)

    // 评估模型
    println("Evaluate...")
    val evaluator = new RegressionEvaluator()
     .setMetricName("rmse")
     .setLabelCol("rating")
     .setPredictionCol("prediction")

    val rmse = evaluator.evaluate(predictions)
    println(s"RMSE: $rmse")

    // 可选:生成所有用户的TopN推荐(比如每个用户推荐10部电影)
    val userRecommendations = model.recommendForAllUsers(10)
    // 如果只需要测试集中的用户推荐,可以用recommendForUserSubset
    // val testUsers = testDF.select("userId").distinct()
    // val userRecommendations = model.recommendForUserSubset(testUsers, 10)

    val t1 = System.nanoTime()
    val time = (t1 - t0) / 1000000000
    println(s"Elapsed time:\t" + time + "sec (" + (t1 - t0) + "ns)")

    spark.close()
  }
}

额外优化建议

  • 数据格式优化:把CSV转换成Parquet格式,Parquet是列式存储,读取和处理速度比CSV快很多,尤其适合大规模数据集。你可以先把训练集和测试集转成Parquet存在GCS上,后续直接读取Parquet文件。
  • 资源配置调整:Master节点2核16G足够,但Worker节点的内存可以适当调整Spark的内存参数,比如设置spark.executor.memory为24G(给系统留8G),spark.executor.cores为5(和你Worker的核数匹配)。
  • 监控集群状态:在GCP的Dataproc控制台查看Spark的UI(http://:4040),可以直观看到任务的分布、分区情况、数据倾斜等问题,方便进一步调优。

备注:内容来源于stack exchange,提问作者Luca Genova

火山引擎 最新活动