You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Pandas实现带条件的行抽样(替代低效循环)

如何用Pandas高效实现带状态变化和间隔限制的行抽样?

我明白你的需求:你需要对列表或DataFrame按dtype分为'a'和'b'两组分别抽样,规则是最大抽样间隔为10行,且state列值变化时必须抽样。原有的非Pandas代码可读性差,直接用循环处理DataFrame又速度太慢,接下来我会给出高效的Pandas实现方案。

问题背景与输入输出

需求梳理

  • dtype的'a'、'b'分别执行抽样逻辑;
  • 抽样触发条件:
    1. 当前行与上一次抽样行的ind(索引列)间隔≥10;
    2. 当前行的state与上一行state不同;
  • 需保留与原代码一致的抽样结果。

输入数据

# 输入数据(可转换为DataFrame),列依次为:ind, state, dtype, value
x = [
 [1, 0, 'b', 93.8], [2, 0, 'b', 97.4], [3, 0, 'b', 76.1], [4, 0, 'b', 21.1], [5, 0, 'b', 65.7],
 [6, 0, 'b', 90.8], [7, 0, 'b', 63.8], [8, 0, 'b', 82.9], [9, 0, 'b', 19.8], [10, 0, 'b', 10.2],
 [11, 0, 'b', 1.3], [12, 1, 'b', 37.6], [13, 0, 'b', 18.2], [14, 0, 'b', 16.9], [15, 0, 'b', 95.6],
 [16, 1, 'b', 23.7], [17, 0, 'b', 54.1], [18, 0, 'b', 99.0], [19, 0, 'b', 16.3], [20, 0, 'a', 80.7],
 [21, 0, 'a', 23.1], [22, 0, 'a', 96.6], [23, 0, 'a', 56.7], [24, 0, 'a', 45.3], [25, 1, 'a', 58.0],
 [26, 0, 'a', 49.9], [27, 0, 'a', 91.3], [28, 0, 'b', 60.2], [29, 0, 'b', 76.8], [30, 0, 'b', 45.3],
 [31, 0, 'b', 69.6], [32, 0, 'b', 99.0], [33, 0, 'b', 29.5], [34, 0, 'b', 11.0], [35, 0, 'b', 68.9],
 [36, 0, 'b', 75.8], [37, 1, 'b', 89.8], [38, 0, 'b', 57.7], [39, 1, 'b', 20.3], [40, 0, 'b', 98.6],
 [41, 0, 'b', 96.7], [42, 0, 'b', 17.9], [43, 1, 'b', 14.6], [44, 0, 'b', 92.5], [45, 0, 'b', 33.6],
 [46, 1, 'b', 58.9], [47, 1, 'b', 71.9], [48, 0, 'b', 74.9], [49, 0, 'b', 43.3], [50, 1, 'b', 29.5],
 [51, 0, 'b', 24.6], [52, 0, 'b', 2.3], [53, 0, 'b', 19.1], [54, 0, 'b', 31.6], [55, 0, 'b', 80.6],
 [56, 0, 'b', 3.2], [57, 0, 'b', 58.5], [58, 1, 'b', 30.2], [59, 1, 'b', 29.1], [60, 0, 'b', 47.6],
 [61, 0, 'b', 76.4], [62, 0, 'b', 21.6], [63, 0, 'b', 82.7], [64, 0, 'b', 0.2], [65, 0, 'b', 9.4],
 [66, 0, 'b', 75.1], [67, 0, 'b', 33.8], [68, 0, 'b', 82.0], [69, 0, 'b', 56.9], [70, 0, 'b', 62.5],
 [71, 0, 'b', 53.5], [72, 0, 'b', 7.0], [73, 0, 'a', 37.4], [74, 0, 'a', 88.8], [75, 0, 'a', 46.4],
 [76, 0, 'a', 86.3], [77, 0, 'a', 54.3], [78, 0, 'b', 23.4], [79, 0, 'b', 1.1], [80, 0, 'b', 78.5],
 [81, 0, 'b', 39.1], [82, 1, 'b', 79.0], [83, 0, 'b', 41.0], [84, 0, 'b', 40.3], [85, 0, 'a', 66.5],
 [86, 0, 'a', 66.8], [87, 0, 'a', 86.8], [88, 1, 'b', 96.9], [89, 0, 'b', 2.1], [90, 0, 'b', 46.3],
 [91, 0, 'b', 28.9], [92, 0, 'b', 43.2], [93, 0, 'b', 58.9], [94, 0, 'b', 60.6], [95, 0, 'b', 15.4],
 [96, 0, 'b', 69.4], [97, 1, 'b', 18.4], [98, 0, 'b', 41.3], [99, 0, 'b', 40.5]
]

原非Pandas实现代码

def resample(x, log_interval, dtype):
    if not x:
        return []
    red = []
    prev_state, next_val, last_val = 0, 0, 0
    for row in x:
        if row[2] == dtype:
            if row[0] >= next_val or row[1] != prev_state and row[0] > last_val:
                red.append(row)
                prev_state = row[1]
                next_val = row[0] + log_interval
                last_val = row[0]
    return red

red_a = resample(x, 10, 'a')
red_b = resample(x, 10, 'b')

预期输出

red_a = [
 [20, 0, 'a', 80.7], [25, 1, 'a', 58.0], [26, 0, 'a', 49.9], [73, 0, 'a', 37.4], [85, 0, 'a', 66.5]
]

red_b = [
 [1, 0, 'b', 93.8], [11, 0, 'b', 1.3], [12, 1, 'b', 37.6], [13, 0, 'b', 18.2], [16, 1, 'b', 23.7],
 [17, 0, 'b', 54.1], [28, 0, 'b', 60.2], [37, 1, 'b', 89.8], [38, 0, 'b', 57.7], [39, 1, 'b', 20.3],
 [40, 0, 'b', 98.6], [43, 1, 'b', 14.6], [44, 0, 'b', 92.5], [46, 1, 'b', 58.9], [48, 0, 'b', 74.9],
 [50, 1, 'b', 29.5], [51, 0, 'b', 24.6], [58, 1, 'b', 30.2], [60, 0, 'b', 47.6], [70, 0, 'b', 62.5],
 [80, 0, 'b', 78.5], [82, 1, 'b', 79.0], [83, 0, 'b', 41.0], [88, 1, 'b', 96.9], [89, 0, 'b', 2.1],
 [97, 1, 'b', 18.4], [98, 0, 'b', 41.3]
]

已转换的DataFrame

columns = ['ind', 'state', 'dtype', 'value']
df = pd.DataFrame(x, columns=columns)

Pandas高效实现方案

我们可以利用Pandas的groupby分组功能,结合局部循环(仅在分组内循环,数据量更小)来实现需求,既保证可读性,又大幅提升处理速度。

核心思路

  1. dtype分组,分别处理'a'和'b'的抽样逻辑;
  2. 对每个分组,跟踪两个关键变量:上一次抽样的ind、上一行的state
  3. 逐行判断是否满足抽样条件(间隔≥10或state变化),标记需要保留的行;
  4. 确保每个分组的第一行被抽样(对应原逻辑的初始添加)。

代码实现

import pandas as pd

def pandas_resample(df, log_interval=10):
    # 定义分组处理函数
    def process_single_group(group):
        # 初始化标记数组,默认不抽样
        sample_mask = pd.Series(False, index=group.index)
        last_sampled_ind = -float('inf')  # 初始化为负无穷,确保第一行满足间隔条件
        prev_state = None
        
        for idx, row in group.iterrows():
            # 条件1:当前行与上一次抽样行的间隔≥log_interval
            condition_interval = row['ind'] >= last_sampled_ind + log_interval
            # 条件2:state发生变化(排除初始prev_state为None的情况)
            condition_state_change = (prev_state is not None) and (row['state'] != prev_state)
            
            if condition_interval or condition_state_change:
                sample_mask.loc[idx] = True
                last_sampled_ind = row['ind']  # 更新上一次抽样的ind
            
            prev_state = row['state']  # 更新上一行的state
        
        # 强制保留分组的第一行(与原逻辑一致)
        sample_mask.iloc[0] = True
        return group[sample_mask]
    
    # 按dtype分组处理,合并结果
    sampled_df = df.groupby('dtype').apply(process_single_group).reset_index(drop=True)
    
    # 转换为与原输出一致的列表格式
    red_a = sampled_df[sampled_df['dtype'] == 'a'].values.tolist()
    red_b = sampled_df[sampled_df['dtype'] == 'b'].values.tolist()
    
    return red_a, red_b

# 执行抽样
red_a_pd, red_b_pd = pandas_resample(df)

# 验证结果是否与预期一致
print("red_a 匹配预期:", red_a_pd == red_a)
print("red_b 匹配预期:", red_b_pd == red_b)

性能与可读性说明

  • 性能提升:相比遍历整个DataFrame的循环,groupby后仅在每个分组内循环,数据量更小;同时结合Pandas的Series标记,比纯Python列表操作更高效;
  • 可读性:代码逻辑清晰,分组处理函数单独封装,每个条件都有明确注释,便于维护;
  • 结果一致性:严格复刻原代码的抽样逻辑,确保输出与预期完全一致。

超大数据量优化建议

如果你的数据量极大(百万行以上),可以尝试用numba库对分组内的循环进行JIT编译,进一步加速:

from numba import jit

# 将process_single_group中的循环部分用numba装饰
@jit(nopython=True)
def numba_process(inds, states, log_interval):
    n = len(inds)
    mask = [False] * n
    last_sampled_ind = -float('inf')
    prev_state = states[0]
    mask[0] = True
    last_sampled_ind = inds[0]
    
    for i in range(1, n):
        condition_interval = inds[i] >= last_sampled_ind + log_interval
        condition_state_change = states[i] != prev_state
        
        if condition_interval or condition_state_change:
            mask[i] = True
            last_sampled_ind = inds[i]
        
        prev_state = states[i]
    
    return mask

# 修改process_single_group函数
def process_single_group(group):
    inds = group['ind'].values
    states = group['state'].values
    mask = numba_process(inds, states, 10)
    return group[mask]

这个优化能让循环部分的速度提升数倍,适合超大规模数据集。


内容的提问来源于stack exchange,提问作者joeDiHare

火山引擎 最新活动