You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何基于Spark ML按列分组计算TF-IDF而非全DataFrame计算?

按分组计算TF-IDF:Spark ML的实现方案

Spark ML原生的IDF estimator是基于整个数据集训练的,它会用全局的文档总数来计算逆文档频率,所以并不直接支持按列(比如你的id字段)分组单独计算IDF的需求。不过不用担心,我们可以通过分组后自定义处理的方式来实现你的需求,下面给你两种可行的方案,基于你提供的Scala示例来演示。

方案一:基于RDD分组手动计算TF-IDF

这种方案完全基于Spark原生API,不需要额外依赖,适合大规模数据场景:

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.linalg.Vectors

object GroupedTFIDFExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("GroupedTFIDF")
      .master("local[*]") // 生产环境去掉master配置
      .getOrCreate()
    import spark.implicits._

    // 加载示例数据
    val sample = Seq(
      (1, "A B C D E"),
      (1, "B C D"),
      (1, "B C D E"),
      (2, "B C D F"),
      (2, "A B C"),
      (2, "B C E F G")
    ).toDF("id", "sentences")

    // 第一步:分词处理
    val tokenizer = new Tokenizer()
      .setInputCol("sentences")
      .setOutputCol("words")
    val tokenizedDF = tokenizer.transform(sample)

    // 第二步:按id分组,手动计算每个分组内的TF-IDF
    val groupedRDD = tokenizedDF.select("id", "words").rdd.groupBy(_.getAs[Int]("id"))

    val tfidfResults = groupedRDD.flatMap { case (groupId, rows) =>
      // 提取当前分组的所有文档词列表
      val docs = rows.map(_.getAs[Seq[String]]("words")).toList
      val totalDocs = docs.size.toDouble

      // 计算每个词的文档频率(DF):出现该词的文档数量
      val docFreq = docs.flatMap(_.distinct)
        .groupBy(identity)
        .mapValues(_.size.toDouble)

      // 计算IDF:采用平滑公式 log(总文档数 / (DF + 1)),避免DF为0的情况
      val idfMap = docFreq.map { case (word, df) =>
        word -> math.log(totalDocs / (df + 1))
      }

      // 对每个文档计算TF-IDF向量
      rows.map { row =>
        val words = row.getAs[Seq[String]]("words")
        // 计算词频(TF):词在当前文档中的出现次数
        val tfMap = words.groupBy(identity).mapValues(_.size.toDouble)
        // 获取当前分组的词汇表,构建TF-IDF向量
        val vocab = idfMap.keys.toList
        val tfidfValues = vocab.map(word => tfMap.getOrElse(word, 0.0) * idfMap(word))
        val tfidfVector = Vectors.dense(tfidfValues.toArray)

        (groupId, row.getAs[String]("sentences"), vocab, tfidfVector)
      }
    }

    // 转换为DataFrame展示结果
    val resultDF = tfidfResults.toDF("id", "sentence", "vocabulary", "tfidf_vector")
    resultDF.show(false)

    spark.stop()
  }
}

代码说明:

  1. 分词:先用Tokenizer把句子拆分成词列表;
  2. 分组处理:通过RDD的groupByid拆分数据,每个分组独立计算;
  3. IDF计算:基于当前分组的总文档数和词的文档频率计算IDF;
  4. TF-IDF向量构建:对每个文档计算词频,再乘以对应词的IDF,最终生成稠密向量。

方案二:使用Pandas UDF结合Scikit-learn(适合中小规模分组)

如果你的分组数据量不大,可以用Pandas UDF结合Scikit-learn的TfidfVectorizer来简化代码,开发效率更高:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.pandas_udf

object GroupedTFIDFPandasExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("GroupedTFIDFPandas")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val sample = Seq(
      (1, "A B C D E"),
      (1, "B C D"),
      (1, "B C D E"),
      (2, "B C D F"),
      (2, "A B C"),
      (2, "B C E F G")
    ).toDF("id", "sentences")

    // 定义分组Pandas UDF
    val groupedTfidfUDF: UserDefinedFunction = pandas_udf(
      StructType(Seq(
        StructField("vocabulary", ArrayType(StringType)),
        StructField("tfidf_vector", ArrayType(DoubleType))
      )),
      PandasUDFType.GROUPED_MAP
    ) { (df: pd.DataFrame) =>
      // 导入Scikit-learn的TF-IDF工具
      import pandas as pd
      from sklearn.feature_extraction.text import TfidfVectorizer

      val texts = df["sentences"].values
      // 初始化TF-IDF向量器,开启IDF和平滑
      tfidf = TfidfVectorizer(use_idf=True, smooth_idf=True)
      tfidfMatrix = tfidf.fit_transform(texts)
      // 获取词汇表和向量结果
      vocab = tfidf.get_feature_names_out().tolist()
      vectors = tfidfMatrix.toarray().tolist()
      // 返回结果DataFrame
      pd.DataFrame({
        "vocabulary": [vocab] * len(vectors),
        "tfidf_vector": vectors
      })
    }

    // 按id分组应用UDF
    val resultDF = sample.groupBy("id").apply(groupedTfidfUDF)
    resultDF.show(false)

    spark.stop()
  }
}

注意事项:

  • 需要确保executor节点安装了pandasscikit-learn依赖;
  • 这种方式适合分组数据量较小的场景,因为每个分组的数据会加载到executor的内存中处理。

总结

Spark ML原生不支持分组计算TF-IDF,但通过上述两种方案可以轻松实现你的需求:

  • 大规模数据选方案一,基于RDD手动计算,性能更稳定;
  • 中小规模分组选方案二,代码更简洁,开发效率高。

内容的提问来源于stack exchange,提问作者Mohan

火山引擎 最新活动