如何对齐Pandas与PySpark的分位数(Quantile)计算结果?
如何对齐Pandas与PySpark的分位数(Quantile)计算结果?
嗨,我来帮你捋捋这个问题!你观察到的Pandas和PySpark分位数结果差异确实挺常见的,核心原因是两者默认采用了完全不同的分位数计算逻辑——Pandas默认用的是连续型的线性插值逻辑,而PySpark的approxQuantile(包括pandas API on Spark的quantile)默认用的是离散型的“取有序数组中对应位置元素”的逻辑,所以哪怕你试了Pandas的几种插值方法,也找不到完全匹配PySpark默认行为的选项。
下面给你两种对齐思路,你可以根据自己的需求选:
一、让PySpark输出和Pandas一致的结果
既然PySpark默认的方法没法直接匹配Pandas的线性插值逻辑,我们可以手动实现Pandas的分位数计算逻辑:
- 先把目标列的所有值收集起来并排序
- 按照Pandas的线性插值公式计算分位数
给你写个具体的PySpark代码示例:
from pyspark.sql import functions as F import numpy as np import pandas as pd # 你的原始Spark DataFrame data = {"A": [1, 2, 3, 4, 5]} sdf = spark.createDataFrame(pd.DataFrame(data)) # 先获取排序后的数组和数据总条数 stats = sdf.agg( F.sort_array(F.collect_list("A")).alias("sorted_vals"), F.count("A").alias("n") ).collect()[0] sorted_vals = stats["sorted_vals"] n = stats["n"] # 定义要计算的分位数列表 percentiles = [0.2, 0.8] result = {} for p in percentiles: # 按照Pandas默认的linear插值逻辑计算 pos = (n - 1) * p idx = int(np.floor(pos)) frac = pos - idx # 处理边界情况(比如p=1时直接取最后一个元素) if idx >= n - 1: quantile_val = sorted_vals[-1] else: quantile_val = sorted_vals[idx] + frac * (sorted_vals[idx+1] - sorted_vals[idx]) result[p] = quantile_val print(result) # 输出 {0.2: 1.8, 0.8: 4.2},和Pandas完全一致
二、让Pandas输出和PySpark一致的结果
如果你的需求是让Pandas对齐PySpark的结果,也可以手动实现PySpark的离散型分位数逻辑:
PySpark默认的分位数计算逻辑是:对于n个元素的有序数组,分位数p对应的是第ceil(p * n)个元素(注意是1-based索引),转换成Pandas的0-based索引就是ceil(p * n) - 1。
具体代码示例:
import pandas as pd import numpy as np # 你的原始Pandas DataFrame data = {"A": [1, 2, 3, 4, 5]} pdf = pd.DataFrame(data) # 定义要计算的分位数列表 percentiles = [0.2, 0.8] result = {} for p in percentiles: n = len(pdf) # 计算1-based的位置,转成0-based索引 k = np.ceil(p * n).astype(int) - 1 # 排序后取对应位置的元素 quantile_val = pdf["A"].sort_values().iloc[k] result[p] = quantile_val print(result) # 输出 {0.2: 1.0, 0.8: 4.0},和PySpark完全一致
另外补充一句:如果你用的是PySpark的pandas API(sdf.pandas_api()),目前它的quantile方法是复用PySpark的底层逻辑,所以也没法直接通过参数调整到Pandas的默认行为,还是得用上面手动实现的方法。
备注:内容来源于stack exchange,提问作者Павел Янко




