基于绝对频率的随机代表性子集选择算法
基于绝对频率的随机代表性子集选择算法
我太懂你这种找不到接地气方案的烦恼了!那些满是公式的学术论文看着头大,但其实日常业务里用的高效方案大多都是复杂理论的简化版,完全能满足你的需求。下面我给你拆解几个好用又容易实现的算法,从简单到进阶,按需pick就行:
1. 加权随机抽样(最直接高效的基础方案)
这个方法的核心逻辑很简单:让每个名字被选中的概率和它的绝对频率占总频率的比例完全匹配,这样抽出来的子集自然能代表整体的频率分布。
实现步骤
- 先计算所有名字的总绝对频率
total_freq; - 构建一个累积频率数组:把每个名字的频率依次累加,比如第一个名字频率是10,第二个是20,累积数组就是
[10, 30, ..., total_freq]; - 每次抽样时,生成一个1到
total_freq之间的随机数,用二分查找找到这个随机数落在累积数组的哪个区间,对应的名字就是选中的样本; - 如果需要多个样本,重复上述步骤即可(支持放回/不放回抽样)。
简单Python实现
import random import bisect def weighted_random_sample(names, freqs, sample_size, replace=True): total = sum(freqs) # 构建(名字, 累积频率)的列表 name_cumulative = [] current_cum = 0 for name, freq in zip(names, freqs): current_cum += freq name_cumulative.append((name, current_cum)) sample = [] used_names = set() # 不放回抽样时记录已选名字 for _ in range(sample_size): while True: r = random.randint(1, total) # 用二分找对应区间 idx = bisect.bisect_left([c for _, c in name_cumulative], r) selected_name = name_cumulative[idx][0] # 放回抽样直接加,不放回则检查是否已选 if replace or selected_name not in used_names: break sample.append(selected_name) if not replace: used_names.add(selected_name) return sample
为什么高效?
- 预处理累积数组是
O(n)时间,每次抽样是O(log n)时间,总时间复杂度O(n + k*log n)(n是总名字数,k是样本大小); - 内存开销只有
O(n)用来存累积数组,对于百万级别的数据也完全hold住。
2. 分层加权抽样(保证全频率段代表性)
如果担心高频名字会垄断样本,或者想确保低频名字也有入选机会,分层抽样就很合适。核心是先把名字按频率分成不同层级(比如高频、中频、低频),然后在每个层级内按比例抽样,最后合并结果。
实现步骤
- 把所有名字按频率从高到低排序,然后划分层级(比如前20%归为高频层,中间60%中频层,后20%低频层,比例可以自己调);
- 计算每个层级的总频率,按这个频率占总频率的比例分配样本名额;
- 每个层级内用加权随机抽样方法选样本;
- 最后把各层级的样本打乱,得到最终的代表性子集。
简化实现示例
import random import bisect def stratified_weighted_sample(names, freqs, sample_size): # 配对名字和频率并排序 sorted_pairs = sorted(zip(names, freqs), key=lambda x: x[1], reverse=True) total_names = len(sorted_pairs) total_freq = sum(freqs) # 划分层级(这里用20/60/20的比例,可自定义) high_cut = int(total_names * 0.2) mid_cut = high_cut + int(total_names * 0.6) high_group = sorted_pairs[:high_cut] mid_group = sorted_pairs[high_cut:mid_cut] low_group = sorted_pairs[mid_cut:] # 按各层频率占比分配样本数 high_total = sum(f for _, f in high_group) mid_total = sum(f for _, f in mid_group) low_total = sum(f for _, f in low_group) high_sample_num = int(sample_size * (high_total / total_freq)) mid_sample_num = int(sample_size * (mid_total / total_freq)) # 补全剩余名额给低频层,避免被忽略 low_sample_num = sample_size - high_sample_num - mid_sample_num # 抽取单个层级样本的辅助函数 def sample_single_group(group, num): if num <= 0 or not group: return [] group_names, group_freqs = zip(*group) group_total = sum(group_freqs) cumulative = [] current = 0 for f in group_freqs: current += f cumulative.append(current) sample = [] for _ in range(num): r = random.randint(1, group_total) idx = bisect.bisect_left(cumulative, r) sample.append(group_names[idx]) return sample # 合并样本并打乱 final_sample = [] final_sample.extend(sample_single_group(high_group, high_sample_num)) final_sample.extend(sample_single_group(mid_group, mid_sample_num)) final_sample.extend(sample_single_group(low_group, low_sample_num)) random.shuffle(final_sample) return final_sample
优势
- 避免了高频名字“霸榜”的情况,确保低频但可能重要的名字也能出现在样本里;
- 层级划分和样本比例可以根据你的数据分布灵活调整,比如如果低频名字特别多,可以给低频层多分配名额。
3. 加权蓄水池抽样(流式/超大规模数据专属)
如果你的数据量超大(比如千万级以上),或者是流式输入(不能一次性把所有名字加载到内存),这个方法绝对是最优解。它不需要提前知道总频率或总名字数,只需要维护一个固定大小的“蓄水池”来存样本。
实现步骤
- 初始化一个大小为
sample_size的蓄水池; - 遍历每个名字,同时累积当前的总频率
current_total; - 对于当前名字,计算它被选中的概率:
(sample_size * 当前名字频率) / current_total; - 如果随机生成的0-1之间的数小于这个概率,就从蓄水池里随机替换一个元素(替换时也按元素的频率加权选择被替换的对象);
- 遍历完所有数据后,蓄水池里的就是代表性样本。
实现代码
import random import bisect def weighted_reservoir_sample(stream, sample_size): reservoir = [] current_total = 0 for name, freq in stream: current_total += freq # 蓄水池未满,直接加入 if len(reservoir) < sample_size: reservoir.append((name, freq)) continue # 计算当前元素被选中的概率 selection_prob = (sample_size * freq) / current_total if random.random() < selection_prob: # 加权选择蓄水池中要被替换的元素 res_freqs = [f for _, f in reservoir] res_cumulative = [] cum = 0 for f in res_freqs: cum += f res_cumulative.append(cum) r = random.randint(1, sum(res_freqs)) replace_idx = bisect.bisect_left(res_cumulative, r) reservoir[replace_idx] = (name, freq) # 提取样本名字 return [name for name, _ in reservoir]
为什么高效?
- 时间复杂度
O(n),每个元素只处理一次; - 内存开销固定为
O(sample_size),不管原数据有多大,都只存k个样本,内存压力极小。
实用选择建议
最后给你几个快速选方案的小准则:
- 如果数据量不大(比如万级以内),直接用加权随机抽样,实现简单,代表性足够;
- 如果需要确保低频名字不被忽略,选分层加权抽样;
- 如果是流式数据或超大规模数据(百万级以上),加权蓄水池抽样是唯一的高效选择;
- 测试代表性的小技巧:对比子集的频率分布和原数据的频率分布(比如统计高频、中频、低频的占比),如果两者差异很小,就说明样本合格啦。
要是你有具体的数据规模或者特殊要求(比如必须所有样本唯一),随时告诉我,我再给你调细节!




