Scala Spark在GCP分布式运行时Worker节点负载极低,电影协同过滤推荐系统运行异常求助
嗨,我看了你的问题和代码,马上就发现了几个导致Worker节点几乎没负载、所有活儿都压在Master上的核心问题,咱们一步步拆解解决:
核心问题1:Driver端的collect()+循环彻底阻塞了分布式计算
你这段代码里的致命问题就在这里:
val allUserRecommendations = test .select("user_id") .distinct() .as[String] .collect() // 这里把所有用户ID拉到了Master节点的Driver内存里! .map(user_id => predict(user_id, test.select("movie_id").where(col("user_id") === user_id), similarity, ratingsDF)) .reduce((df1, df2) => df1.union(df2))
collect()会把分布式的用户ID数据集全部拉到Driver(也就是Master节点),然后你在Driver上单线程循环处理每个用户的推荐请求,再把结果union起来。这相当于所有的计算逻辑都被限制在Master的单线程里,Worker节点根本没机会参与大规模计算,自然CPU使用率只有2%。
核心问题2:分区数设置严重不足,并行度完全没跟上
你手动设置了repartition(4),但你的集群有两个Worker,每个5核,总共10个可用CPU核心。分区数4远小于核心数,就算有分布式任务,也没法充分利用所有Worker的算力。而且Spark的默认 shuffle 分区数(spark.sql.shuffle.partitions)默认是200,但你手动改成4,会导致shuffle过程中数据过度集中,进一步降低并行效率。
核心问题3:自定义的余弦相似度计算逻辑效率极低,且极易引发数据倾斜
你自己实现的用户相似度计算:
val userProduct = ratingsDF .alias("u1") .join(ratingsDF.alias("u2"), Seq("movie_id")) .where(col("u1.user_id") =!= col("u2.user_id"))
这是一个全量自连接,1亿行的数据集会生成天文数字级别的用户对,不仅计算量爆炸,还会导致严重的数据倾斜(热门电影对应的用户对会远超其他电影),最终要么计算极慢,要么直接OOM。
解决方案:重构代码,用Spark MLlib的ALS算法替代自定义逻辑,充分利用分布式资源
Spark MLlib内置了交替最小二乘法(ALS),这是专门为大规模协同过滤场景优化的分布式算法,能完美适配你的GCP集群,彻底解决Worker闲置的问题。
步骤1:替换自定义相似度计算为ALS算法
ALS会自动处理分布式的矩阵分解,生成用户和物品的隐因子向量,然后高效计算推荐结果,完全不需要你手动处理用户对的相似度。
步骤2:移除Driver端的collect()+循环,改用分布式API处理推荐
用Spark的recommendForAllUsers或者recommendForUserSubset来批量生成推荐,全程分布式执行,Worker节点会被充分利用。
步骤3:调整集群并行度配置
在SparkConf里添加以下配置,让Spark自动适配你的集群资源:
val conf = new SparkConf() .setAppName("CollaborativeFilteringDF") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") .set("spark.sql.shuffle.partitions", "20") // 设置为总核数的2倍(10核*2=20) .set("spark.default.parallelism", "20") // 与shuffle分区数对应
重构后的完整代码示例
import com.google.cloud.storage.{BlobId, StorageOptions} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.log4j.{Level, Logger} import org.apache.spark.ml.recommendation.ALS import org.apache.spark.ml.evaluation.RegressionEvaluator object CollaborativeFilteringALS { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) var bucketName = "bucket" var trainName = "train.csv" var testName = "test.csv" if (args.length > 0) { bucketName = args(0) if (args.length > 2) { trainName = args(1) testName = args(2) } } val basePath = s"gs://$bucketName" val trainPath = s"$basePath/$trainName" val testPath = s"$basePath/$testName" val conf = new SparkConf() .setAppName("CollaborativeFilteringALS") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") .set("spark.sql.shuffle.partitions", "20") // 适配集群总核数(10核*2) .set("spark.default.parallelism", "20") val spark = SparkSession.builder.config(conf).getOrCreate() import spark.implicits._ println(s"Read train: $trainName") val ratingsDF = spark.read .option("header", true) .csv(trainPath) .select( col("user_id").cast("integer").alias("userId"), col("movie_id").cast("integer").alias("movieId"), col("rating").cast("double").alias("rating") ) .repartition(spark.sparkContext.defaultParallelism) // 自动适配并行度 println(s"Read test: $testName") val testDF = spark.read .option("header", true) .csv(testPath) .select( col("user_id").cast("integer").alias("userId"), col("movie_id").cast("integer").alias("movieId"), col("rating").cast("double").alias("rating") ) val t0 = System.nanoTime() // 初始化ALS模型 val als = new ALS() .setMaxIter(10) // 迭代次数,可根据效果调整 .setRegParam(0.1) // 正则化参数,防止过拟合 .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") .setColdStartStrategy("drop") // 忽略冷启动的用户/物品 // 训练模型 val model = als.fit(ratingsDF) // 生成测试集的预测结果 val predictions = model.transform(testDF) // 评估模型 println("Evaluate...") val evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("rating") .setPredictionCol("prediction") val rmse = evaluator.evaluate(predictions) println(s"RMSE: $rmse") // 可选:生成所有用户的TopN推荐(比如每个用户推荐10部电影) val userRecommendations = model.recommendForAllUsers(10) // 如果只需要测试集中的用户推荐,可以用recommendForUserSubset // val testUsers = testDF.select("userId").distinct() // val userRecommendations = model.recommendForUserSubset(testUsers, 10) val t1 = System.nanoTime() val time = (t1 - t0) / 1000000000 println(s"Elapsed time:\t" + time + "sec (" + (t1 - t0) + "ns)") spark.close() } }
额外优化建议
- 数据格式优化:把CSV转换成Parquet格式,Parquet是列式存储,读取和处理速度比CSV快很多,尤其适合大规模数据集。你可以先把训练集和测试集转成Parquet存在GCS上,后续直接读取Parquet文件。
- 资源配置调整:Master节点2核16G足够,但Worker节点的内存可以适当调整Spark的内存参数,比如设置
spark.executor.memory为24G(给系统留8G),spark.executor.cores为5(和你Worker的核数匹配)。 - 监控集群状态:在GCP的Dataproc控制台查看Spark的UI(http://
:4040),可以直观看到任务的分布、分区情况、数据倾斜等问题,方便进一步调优。
备注:内容来源于stack exchange,提问作者Luca Genova




