You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

基于绝对频率的随机代表性子集选择算法

基于绝对频率的随机代表性子集选择算法

我太懂你这种找不到接地气方案的烦恼了!那些满是公式的学术论文看着头大,但其实日常业务里用的高效方案大多都是复杂理论的简化版,完全能满足你的需求。下面我给你拆解几个好用又容易实现的算法,从简单到进阶,按需pick就行:

1. 加权随机抽样(最直接高效的基础方案)

这个方法的核心逻辑很简单:让每个名字被选中的概率和它的绝对频率占总频率的比例完全匹配,这样抽出来的子集自然能代表整体的频率分布。

实现步骤

  • 先计算所有名字的总绝对频率total_freq
  • 构建一个累积频率数组:把每个名字的频率依次累加,比如第一个名字频率是10,第二个是20,累积数组就是[10, 30, ..., total_freq]
  • 每次抽样时,生成一个1到total_freq之间的随机数,用二分查找找到这个随机数落在累积数组的哪个区间,对应的名字就是选中的样本;
  • 如果需要多个样本,重复上述步骤即可(支持放回/不放回抽样)。

简单Python实现

import random
import bisect

def weighted_random_sample(names, freqs, sample_size, replace=True):
    total = sum(freqs)
    # 构建(名字, 累积频率)的列表
    name_cumulative = []
    current_cum = 0
    for name, freq in zip(names, freqs):
        current_cum += freq
        name_cumulative.append((name, current_cum))
    
    sample = []
    used_names = set()  # 不放回抽样时记录已选名字
    for _ in range(sample_size):
        while True:
            r = random.randint(1, total)
            # 用二分找对应区间
            idx = bisect.bisect_left([c for _, c in name_cumulative], r)
            selected_name = name_cumulative[idx][0]
            # 放回抽样直接加,不放回则检查是否已选
            if replace or selected_name not in used_names:
                break
        sample.append(selected_name)
        if not replace:
            used_names.add(selected_name)
    return sample

为什么高效?

  • 预处理累积数组是O(n)时间,每次抽样是O(log n)时间,总时间复杂度O(n + k*log n)(n是总名字数,k是样本大小);
  • 内存开销只有O(n)用来存累积数组,对于百万级别的数据也完全hold住。

2. 分层加权抽样(保证全频率段代表性)

如果担心高频名字会垄断样本,或者想确保低频名字也有入选机会,分层抽样就很合适。核心是先把名字按频率分成不同层级(比如高频、中频、低频),然后在每个层级内按比例抽样,最后合并结果。

实现步骤

  • 把所有名字按频率从高到低排序,然后划分层级(比如前20%归为高频层,中间60%中频层,后20%低频层,比例可以自己调);
  • 计算每个层级的总频率,按这个频率占总频率的比例分配样本名额;
  • 每个层级内用加权随机抽样方法选样本;
  • 最后把各层级的样本打乱,得到最终的代表性子集。

简化实现示例

import random
import bisect

def stratified_weighted_sample(names, freqs, sample_size):
    # 配对名字和频率并排序
    sorted_pairs = sorted(zip(names, freqs), key=lambda x: x[1], reverse=True)
    total_names = len(sorted_pairs)
    total_freq = sum(freqs)
    
    # 划分层级(这里用20/60/20的比例,可自定义)
    high_cut = int(total_names * 0.2)
    mid_cut = high_cut + int(total_names * 0.6)
    high_group = sorted_pairs[:high_cut]
    mid_group = sorted_pairs[high_cut:mid_cut]
    low_group = sorted_pairs[mid_cut:]
    
    # 按各层频率占比分配样本数
    high_total = sum(f for _, f in high_group)
    mid_total = sum(f for _, f in mid_group)
    low_total = sum(f for _, f in low_group)
    
    high_sample_num = int(sample_size * (high_total / total_freq))
    mid_sample_num = int(sample_size * (mid_total / total_freq))
    # 补全剩余名额给低频层,避免被忽略
    low_sample_num = sample_size - high_sample_num - mid_sample_num
    
    # 抽取单个层级样本的辅助函数
    def sample_single_group(group, num):
        if num <= 0 or not group:
            return []
        group_names, group_freqs = zip(*group)
        group_total = sum(group_freqs)
        cumulative = []
        current = 0
        for f in group_freqs:
            current += f
            cumulative.append(current)
        sample = []
        for _ in range(num):
            r = random.randint(1, group_total)
            idx = bisect.bisect_left(cumulative, r)
            sample.append(group_names[idx])
        return sample
    
    # 合并样本并打乱
    final_sample = []
    final_sample.extend(sample_single_group(high_group, high_sample_num))
    final_sample.extend(sample_single_group(mid_group, mid_sample_num))
    final_sample.extend(sample_single_group(low_group, low_sample_num))
    random.shuffle(final_sample)
    return final_sample

优势

  • 避免了高频名字“霸榜”的情况,确保低频但可能重要的名字也能出现在样本里;
  • 层级划分和样本比例可以根据你的数据分布灵活调整,比如如果低频名字特别多,可以给低频层多分配名额。

3. 加权蓄水池抽样(流式/超大规模数据专属)

如果你的数据量超大(比如千万级以上),或者是流式输入(不能一次性把所有名字加载到内存),这个方法绝对是最优解。它不需要提前知道总频率或总名字数,只需要维护一个固定大小的“蓄水池”来存样本。

实现步骤

  • 初始化一个大小为sample_size的蓄水池;
  • 遍历每个名字,同时累积当前的总频率current_total
  • 对于当前名字,计算它被选中的概率:(sample_size * 当前名字频率) / current_total
  • 如果随机生成的0-1之间的数小于这个概率,就从蓄水池里随机替换一个元素(替换时也按元素的频率加权选择被替换的对象);
  • 遍历完所有数据后,蓄水池里的就是代表性样本。

实现代码

import random
import bisect

def weighted_reservoir_sample(stream, sample_size):
    reservoir = []
    current_total = 0
    
    for name, freq in stream:
        current_total += freq
        # 蓄水池未满,直接加入
        if len(reservoir) < sample_size:
            reservoir.append((name, freq))
            continue
        
        # 计算当前元素被选中的概率
        selection_prob = (sample_size * freq) / current_total
        if random.random() < selection_prob:
            # 加权选择蓄水池中要被替换的元素
            res_freqs = [f for _, f in reservoir]
            res_cumulative = []
            cum = 0
            for f in res_freqs:
                cum += f
                res_cumulative.append(cum)
            r = random.randint(1, sum(res_freqs))
            replace_idx = bisect.bisect_left(res_cumulative, r)
            reservoir[replace_idx] = (name, freq)
    
    # 提取样本名字
    return [name for name, _ in reservoir]

为什么高效?

  • 时间复杂度O(n),每个元素只处理一次;
  • 内存开销固定为O(sample_size),不管原数据有多大,都只存k个样本,内存压力极小。

实用选择建议

最后给你几个快速选方案的小准则:

  • 如果数据量不大(比如万级以内),直接用加权随机抽样,实现简单,代表性足够;
  • 如果需要确保低频名字不被忽略,选分层加权抽样
  • 如果是流式数据或超大规模数据(百万级以上),加权蓄水池抽样是唯一的高效选择;
  • 测试代表性的小技巧:对比子集的频率分布和原数据的频率分布(比如统计高频、中频、低频的占比),如果两者差异很小,就说明样本合格啦。

要是你有具体的数据规模或者特殊要求(比如必须所有样本唯一),随时告诉我,我再给你调细节!

火山引擎 最新活动