You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何通过条件与groupby处理Pandas DataFrame筛选指定TRADE记录

最优实现思路与代码

针对你的需求,我会用Pandas的分组(groupby)结合窗口函数/自定义逻辑来实现,既保证效率(处理数百个Instrument的大数据量也没问题),又逻辑清晰。核心步骤是:先确保每组内按时间排序,定位出目标第一条TRADE,再筛选30分钟窗口内的最晚TRADE。

步骤1:数据预处理(确保时间有序)

首先,必须让每个['Instrument', 'Date']分组内的记录按午夜后毫秒数(假设列名是ms_since_midnight)升序排列,这是后续定位的基础:

df = df.sort_values(by=['Instrument', 'Date', 'ms_since_midnight'])

步骤2:定位每组内的第一条目标TRADE(紧跟Control后的首个TRADE)

我们需要先标记出每组内所有在RecordType='Control'之后的记录,然后取第一个RecordType='TRADE'的条目。这里考虑了每组可能有多个Control的情况,取最晚的Control之后的第一个TRADE,更符合实际场景:

def get_first_target_trade(group):
    # 找到所有Control的索引,取最后一个(最晚的Control)
    control_indices = group[group['RecordType'] == 'Control'].index
    if not control_indices.empty:
        last_control_idx = control_indices[-1]
        # 筛选Control之后的所有TRADE,取第一条
        post_control_trades = group.loc[last_control_idx:, :][group['RecordType'] == 'TRADE']
        if not post_control_trades.empty:
            return post_control_trades.iloc[0]
    return None

# 提取每组的第一条目标TRADE,过滤无结果的分组
first_trades = df.groupby(['Instrument', 'Date']).apply(get_first_target_trade).dropna()

步骤3:定位每组内的第二条目标TRADE(第一条TRADE后30分钟内的最晚TRADE)

有了第一条TRADE的时间,计算30分钟后的时间阈值(30*60*1000=1800000毫秒),然后在每组内筛选出时间在阈值范围内、且晚于第一条TRADE的所有TRADE,取最晚的那条:

def get_second_target_trade(group, first_trade):
    if first_trade is None:
        return None
    # 计算时间阈值
    threshold = first_trade['ms_since_midnight'] + 1800000
    # 筛选符合时间范围的TRADE(排除第一条本身)
    eligible_trades = group[(group['RecordType'] == 'TRADE') & 
                           (group['ms_since_midnight'] > first_trade['ms_since_midnight']) &
                           (group['ms_since_midnight'] <= threshold)]
    if not eligible_trades.empty:
        return eligible_trades.iloc[-1]
    return None

# 遍历分组获取第二条目标TRADE
second_trades = []
for (inst, date), group in df.groupby(['Instrument', 'Date']):
    first_trade = first_trades.get((inst, date), None)
    second_trade = get_second_target_trade(group, first_trade)
    if second_trade is not None:
        second_trades.append(second_trade)
second_trades = pd.DataFrame(second_trades)

步骤4:合并结果并整理

最后把两条目标TRADE合并成最终结果,保持分组内的时间顺序:

final_result = pd.concat([first_trades, second_trades]).sort_values(by=['Instrument', 'Date', 'ms_since_midnight'])

大数据量优化方案

如果你的DataFrame行数特别多(比如百万级),可以用Pandas窗口函数替代自定义apply,大幅提升效率:

  1. 标记每组内所有在Control之后的记录:
df['is_after_control'] = df.groupby(['Instrument', 'Date'])['RecordType'].transform(
    lambda x: (x == 'Control').cumsum() > 0
)
  1. 快速筛选出第一条目标TRADE:
first_trades = df[(df['is_after_control']) & (df['RecordType'] == 'TRADE')].groupby(['Instrument', 'Date']).first()
  1. 合并时间阈值,筛选第二条目标TRADE:
df_with_threshold = df.merge(
    first_trades[['ms_since_midnight']].rename(columns={'ms_since_midnight': 'first_trade_ms'}),
    on=['Instrument', 'Date'],
    how='left'
)
df_with_threshold['threshold'] = df_with_threshold['first_trade_ms'] + 1800000

second_trades = df_with_threshold[(df_with_threshold['RecordType'] == 'TRADE') & 
                                 (df_with_threshold['ms_since_midnight'] > df_with_threshold['first_trade_ms']) &
                                 (df_with_threshold['ms_since_midnight'] <= df_with_threshold['threshold'])].groupby(['Instrument', 'Date']).last()

注意事项

  • 确保ms_since_midnight是数值类型,避免计算错误。
  • 如果某分组没有Control或Control后无TRADE,该分组会被自动过滤,你可以根据需求添加默认值处理。
  • 如果第一条TRADE后30分钟内无其他TRADE,该分组的第二条记录会被过滤,可根据业务调整逻辑(比如保留第一条)。

内容的提问来源于stack exchange,提问作者Shaun Lim

火山引擎 最新活动