如何基于Spark ML按列分组计算TF-IDF而非全DataFrame计算?
按分组计算TF-IDF:Spark ML的实现方案
Spark ML原生的IDF estimator是基于整个数据集训练的,它会用全局的文档总数来计算逆文档频率,所以并不直接支持按列(比如你的id字段)分组单独计算IDF的需求。不过不用担心,我们可以通过分组后自定义处理的方式来实现你的需求,下面给你两种可行的方案,基于你提供的Scala示例来演示。
方案一:基于RDD分组手动计算TF-IDF
这种方案完全基于Spark原生API,不需要额外依赖,适合大规模数据场景:
import org.apache.spark.sql.SparkSession import org.apache.spark.ml.feature.Tokenizer import org.apache.spark.ml.linalg.Vectors object GroupedTFIDFExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("GroupedTFIDF") .master("local[*]") // 生产环境去掉master配置 .getOrCreate() import spark.implicits._ // 加载示例数据 val sample = Seq( (1, "A B C D E"), (1, "B C D"), (1, "B C D E"), (2, "B C D F"), (2, "A B C"), (2, "B C E F G") ).toDF("id", "sentences") // 第一步:分词处理 val tokenizer = new Tokenizer() .setInputCol("sentences") .setOutputCol("words") val tokenizedDF = tokenizer.transform(sample) // 第二步:按id分组,手动计算每个分组内的TF-IDF val groupedRDD = tokenizedDF.select("id", "words").rdd.groupBy(_.getAs[Int]("id")) val tfidfResults = groupedRDD.flatMap { case (groupId, rows) => // 提取当前分组的所有文档词列表 val docs = rows.map(_.getAs[Seq[String]]("words")).toList val totalDocs = docs.size.toDouble // 计算每个词的文档频率(DF):出现该词的文档数量 val docFreq = docs.flatMap(_.distinct) .groupBy(identity) .mapValues(_.size.toDouble) // 计算IDF:采用平滑公式 log(总文档数 / (DF + 1)),避免DF为0的情况 val idfMap = docFreq.map { case (word, df) => word -> math.log(totalDocs / (df + 1)) } // 对每个文档计算TF-IDF向量 rows.map { row => val words = row.getAs[Seq[String]]("words") // 计算词频(TF):词在当前文档中的出现次数 val tfMap = words.groupBy(identity).mapValues(_.size.toDouble) // 获取当前分组的词汇表,构建TF-IDF向量 val vocab = idfMap.keys.toList val tfidfValues = vocab.map(word => tfMap.getOrElse(word, 0.0) * idfMap(word)) val tfidfVector = Vectors.dense(tfidfValues.toArray) (groupId, row.getAs[String]("sentences"), vocab, tfidfVector) } } // 转换为DataFrame展示结果 val resultDF = tfidfResults.toDF("id", "sentence", "vocabulary", "tfidf_vector") resultDF.show(false) spark.stop() } }
代码说明:
- 分词:先用
Tokenizer把句子拆分成词列表; - 分组处理:通过RDD的
groupBy按id拆分数据,每个分组独立计算; - IDF计算:基于当前分组的总文档数和词的文档频率计算IDF;
- TF-IDF向量构建:对每个文档计算词频,再乘以对应词的IDF,最终生成稠密向量。
方案二:使用Pandas UDF结合Scikit-learn(适合中小规模分组)
如果你的分组数据量不大,可以用Pandas UDF结合Scikit-learn的TfidfVectorizer来简化代码,开发效率更高:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.pandas_udf object GroupedTFIDFPandasExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("GroupedTFIDFPandas") .master("local[*]") .getOrCreate() import spark.implicits._ val sample = Seq( (1, "A B C D E"), (1, "B C D"), (1, "B C D E"), (2, "B C D F"), (2, "A B C"), (2, "B C E F G") ).toDF("id", "sentences") // 定义分组Pandas UDF val groupedTfidfUDF: UserDefinedFunction = pandas_udf( StructType(Seq( StructField("vocabulary", ArrayType(StringType)), StructField("tfidf_vector", ArrayType(DoubleType)) )), PandasUDFType.GROUPED_MAP ) { (df: pd.DataFrame) => // 导入Scikit-learn的TF-IDF工具 import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer val texts = df["sentences"].values // 初始化TF-IDF向量器,开启IDF和平滑 tfidf = TfidfVectorizer(use_idf=True, smooth_idf=True) tfidfMatrix = tfidf.fit_transform(texts) // 获取词汇表和向量结果 vocab = tfidf.get_feature_names_out().tolist() vectors = tfidfMatrix.toarray().tolist() // 返回结果DataFrame pd.DataFrame({ "vocabulary": [vocab] * len(vectors), "tfidf_vector": vectors }) } // 按id分组应用UDF val resultDF = sample.groupBy("id").apply(groupedTfidfUDF) resultDF.show(false) spark.stop() } }
注意事项:
- 需要确保executor节点安装了
pandas和scikit-learn依赖; - 这种方式适合分组数据量较小的场景,因为每个分组的数据会加载到executor的内存中处理。
总结
Spark ML原生不支持分组计算TF-IDF,但通过上述两种方案可以轻松实现你的需求:
- 大规模数据选方案一,基于RDD手动计算,性能更稳定;
- 中小规模分组选方案二,代码更简洁,开发效率高。
内容的提问来源于stack exchange,提问作者Mohan




