Spark DataFrame分组求和后如何获取最大值及对应源IP
嘿,我明白你的困惑了!你现在踩了两个常见的Spark新手坑:一是混淆了聚合逻辑的层级,二是没搞清楚Spark DataFrame的不可变性特性。
先帮你理清楚问题出在哪:
你第一次执行的df.groupby('sIP').agg({'duration': 'sum'})确实算出了每个源IP的总时长,但你没有把这个结果保存成新的DataFrame。之后你直接跑df.groupby('sIP').agg({'duration': 'max'}),这是对原始DataFrame做聚合,求的是每个IP单条记录里的最大duration,完全不是你要的“总时长的最大值”。
而且Spark里的DataFrame是不可变的——你没法直接“更新”一个已有的DataFrame,所有转换操作(比如groupby、agg)都会生成一个全新的DataFrame,你必须把这个新结果赋值给变量才能后续使用,这就是你忽略的关键点!
接下来给你两种靠谱的解决方案:
方案1:用Spark SQL,直观好理解
先把第一次聚合的结果注册成临时视图,再用SQL语句直接查最大值:
# 先计算每个IP的总时长,注册为临时视图 ip_total_df = df.groupby('sIP').agg({'duration': 'sum'}) ip_total_df.createOrReplaceTempView('ip_total_duration') # 查总时长最大的那条记录 spark.sql("SELECT sIP, `sum(duration)` AS total_duration FROM ip_total_duration ORDER BY total_duration DESC LIMIT 1").show()
方案2:纯DataFrame API操作
如果更喜欢用DataFrame链式调用,可以这么写:
from pyspark.sql.functions import col, max as spark_max # 第一步:得到每个IP的总时长,并重命名列名方便后续操作 total_duration_df = df.groupby('sIP')\ .agg({'duration': 'sum'})\ .withColumnRenamed('sum(duration)', 'total_duration') # 方法A:先算最大值,再筛选 max_total = total_duration_df.select(spark_max('total_duration')).first()[0] total_duration_df.filter(col('total_duration') == max_total).show() # 方法B:排序后取第一条(数据量不大时用更简洁) total_duration_df.orderBy(col('total_duration').desc()).limit(1).show()
以后记住哦:Spark里的DataFrame一旦创建就不能修改,所有转换都要生成新的DataFrame并赋值,不然你的操作都是在原数据上绕圈子~
内容的提问来源于stack exchange,提问作者user3336801




