You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

关于将海量股票OHLC CSV数据存入字典优化多进程IO操作的可行性咨询

问题:多进程处理大量CSV文件时,预加载所有数据到字典是否能优化性能?

我有大约1500个包含股票OHLC数据的CSV文件,每个文件有90000-100000行数据。目前用多进程代码迭代处理这些文件,当开16个进程时系统会轻微卡顿,确定是IO设备使用率过高导致的(因为要逐个打开文件)。现在用10个进程运行正常,想咨询:把全部1500个CSV文件的数据存入单个字典后再运行代码是否是好方案?此举能否减少处理时间或缓解系统卡顿?

数据格式说明

每个CSV包含股票OHLC相关数据(原提及的图片无法展示)

当前使用的代码

import numpy as np
import pandas as pd
import os
import multiprocessing
import datetime
import itertools
import time
import warnings
warnings.filterwarnings('ignore')

# bank nifty
bn_futures = pd.read_csv('E:\\Tanmay\\Data\\Bank Nifty Index\\BankNifty_Futures GFDL 2011-2020.csv')
bn_futures['Date_time'] = bn_futures['Date'] + ' ' + bn_futures['Time']
bn_futures['Date_time'] = pd.to_datetime(bn_futures['Date_time'],format='%Y-%m-%d %H:%M:%S')
bn_futures = bn_futures[bn_futures['Date_time'].dt.date > datetime.date(2016,5,26)]
req_cols = [x for x in bn_futures.columns if 'Unnamed' not in x]
bn_futures = bn_futures[req_cols]
bn_futures['straddle'] = round(bn_futures['Close'],-2)
bn_futures['straddle'] = bn_futures['straddle'].astype(int)
bn_futures['straddle'] = bn_futures['straddle'].astype(str)
bn_futures['Date'] = bn_futures['Date_time'].dt.date
dates = list(set(bn_futures['Date'].to_list()))
dates.sort()

option_files1 = os.listdir('E:\\2nd Set\\')
option_files = []
for i in option_files1:
    if datetime.datetime.strptime(i.split('.')[0],'%Y-%m-%d').date() >= datetime.date(2016,5,27):
        option_files.append(i)

def time_loop(start_time,end_time,timeframe):
    start_datetime = datetime.datetime.combine(datetime.datetime.today().date(),start_time)
    end_datetime = datetime.datetime.combine(datetime.datetime.today().date(),end_time)
    difference = int((((end_datetime - start_datetime).total_seconds())/60)/timeframe)
    final_time_list = []
    for i in range(difference):
        final_time_list.append((start_datetime+datetime.timedelta(minutes=i*timeframe)).time())
    return final_time_list

entry_time_list = time_loop(datetime.time(9,19),datetime.time(15,19),5)
sl_list = np.arange(1.1, 2, 0.1)
# sl_list = list(range(1.1,2,0.1))
paramlist = list(itertools.product(entry_time_list,sl_list))

def strategy(main_entry_time,sl):
    print(main_entry_time,sl)
    main_dict = {}
    for file in option_files:
        date = datetime.datetime.strptime(file.split('.')[0],'%Y-%m-%d').date()
        try:
            # reading current date bn futures
            bn = bn_futures[bn_futures['Date'] == date]
            # reading main time bn futures
            b = bn[bn['Date_time'].dt.time == main_entry_time]
            straddle_value = b['straddle'].iloc[0]

            df = pd.read_csv('E:\\Tanmay\\Data\\Bank nifty Intraday All expiries\\2nd Set\\'+file)
            df['Date_time'] = pd.to_datetime(df['Date_time'],format='%Y-%m-%d %H:%M:%S')
            h = [k for k in df.columns if 'Un' not in k]
            df = df[h]

            total_df = df[(df['Ticker'].str.contains(straddle_value)) & (df['Expiry_number'] == 0) & (df['W/M'] == 'W')]
            option_types = ['CE','PE']
            for option in option_types:
                option_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time == main_entry_time)]
                entry_price = option_df['Close'].iloc[0]
                strike = option
                entry_time = main_entry_time

                trade_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time > main_entry_time)]
                trade_df.sort_values(by='Date_time',inplace=True)
                for t in trade_df.index:
                    if trade_df['Date_time'][t].time() > entry_time:
                        if trade_df['High'][t] > entry_price * sl:
                            exit_price = entry_price * sl
                            exit_time = trade_df['Date_time'][t].time()
                            profit = entry_price - exit_price - 0.02* entry_price
                            main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'SL'}
                            break
                        if trade_df['Date_time'][t].time() >= datetime.time(15,14,0):
                            exit_price = trade_df['Close'][t]
                            exit_time = trade_df['Date_time'][t].time()
                            profit = entry_price - exit_price - 0.02* entry_price
                            main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'EOD'}
                            break
        except Exception as yy:
            pass
    final_dict = dict(main_dict)
    final_df = pd.DataFrame(final_dict)
    final_df = final_df.transpose()
    final_df.to_csv('SL_'+str(sl)+'entry_time_'+str(main_entry_time).replace(':','')+'entry_date_'+str(date)+'.csv')

if __name__=='__main__':
    start_time = time.time()
    # mgr = multiprocessing.Manager()
    # main_dict = mgr.dict()
    total_data = paramlist
    p = multiprocessing.Pool(processes=10)
    p.starmap(strategy,total_data)
    p.close()

回答

直接给结论:预加载所有CSV数据到内存(比如一个以日期为键的字典)是非常可行的优化方案,不仅能彻底缓解IO卡顿,还能显著减少整体处理时间,但需要先评估你的系统内存是否足够,下面展开说细节:

为什么预加载能解决问题?

你当前的代码存在一个核心问题:每个strategy进程都会重复读取同一个CSV文件(因为不同参数组合会处理同一个日期的数据),这导致了大量的重复磁盘IO——1500个文件,每个会被读取N次(N是你的参数组合总数),磁盘根本扛不住高并发的重复读取请求,所以开16个进程就会卡顿。

如果提前把所有CSV文件加载到内存字典里,所有进程都可以直接从内存读取数据,完全避免重复的磁盘IO操作:

  • 磁盘IO的瓶颈会彻底消失,你可以放心开到16个甚至更多进程
  • 内存读取速度比磁盘快几个数量级,整体处理时间会大幅下降

预加载的实现要点

  1. 预加载时机:一定要在启动多进程之前完成所有CSV的加载,这样子进程可以直接复用父进程加载好的数据(Linux/macOS下是共享内存,Windows下是拷贝内存,后者会占用更多内存,但依然比重复读磁盘好)。
  2. 提前预处理:预加载时就完成所有通用的预处理步骤——比如解析Date_time、过滤Unnamed列,这样每个strategy函数就不用重复做这些工作,进一步节省时间。
  3. 内存占用评估:每个CSV是9-10万行,按pandas的存储效率估算,每个文件大概占50-100MB,1500个文件总内存占用在75-150GB左右。你需要确认你的系统内存是否足够——如果内存不足,系统会启用虚拟内存(swap),反而会更卡。如果内存不够,可以考虑分批预加载,或者把CSV转成Parquet/Feather这类更省内存的格式。

优化后的核心代码示例

# 新增预加载函数,在启动进程前执行
def load_all_option_data():
    date_to_df = {}
    for file in option_files:
        date = datetime.datetime.strptime(file.split('.')[0],'%Y-%m-%d').date()
        df = pd.read_csv('E:\\Tanmay\\Data\\Bank nifty Intraday All expiries\\2nd Set\\'+file)
        # 提前完成所有通用预处理
        df['Date_time'] = pd.to_datetime(df['Date_time'],format='%Y-%m-%d %H:%M:%S')
        h = [k for k in df.columns if 'Un' not in k]
        df = df[h]
        date_to_df[date] = df
    return date_to_df

# 修改strategy函数,接收预加载的字典
def strategy(main_entry_time, sl, date_to_df):
    print(main_entry_time,sl)
    main_dict = {}
    # 直接遍历预加载的日期数据,不用再读文件
    for date in date_to_df:
        try:
            bn = bn_futures[bn_futures['Date'] == date]
            b = bn[bn['Date_time'].dt.time == main_entry_time]
            straddle_value = b['straddle'].iloc[0]

            # 从内存字典直接取数据,跳过文件读取步骤
            df = date_to_df[date]

            total_df = df[(df['Ticker'].str.contains(straddle_value)) & (df['Expiry_number'] == 0) & (df['W/M'] == 'W')]
            option_types = ['CE','PE']
            for option in option_types:
                option_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time == main_entry_time)]
                entry_price = option_df['Close'].iloc[0]
                strike = option
                entry_time = main_entry_time

                trade_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time > main_entry_time)]
                trade_df.sort_values(by='Date_time',inplace=True)
                for t in trade_df.index:
                    if trade_df['Date_time'][t].time() > entry_time:
                        if trade_df['High'][t] > entry_price * sl:
                            exit_price = entry_price * sl
                            exit_time = trade_df['Date_time'][t].time()
                            profit = entry_price - exit_price - 0.02* entry_price
                            main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'SL'}
                            break
                        if trade_df['Date_time'][t].time() >= datetime.time(15,14,0):
                            exit_price = trade_df['Close'][t]
                            exit_time = trade_df['Date_time'][t].time()
                            profit = entry_price - exit_price - 0.02* entry_price
                            main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'EOD'}
                            break
        except Exception as yy:
            pass
    final_dict = dict(main_dict)
    final_df = pd.DataFrame(final_dict)
    final_df = final_df.transpose()
    final_df.to_csv('SL_'+str(sl)+'entry_time_'+str(main_entry_time).replace(':','')+'entry_date_'+str(date)+'.csv')

if __name__=='__main__':
    start_time = time.time()
    # 先完成预加载
    print("开始预加载所有CSV数据...")
    date_to_df = load_all_option_data()
    print("预加载完成,启动多进程...")
    # 构造包含预加载字典的参数列表
    total_data = [(entry_time, sl, date_to_df) for entry_time, sl in paramlist]
    # 现在可以放心开到16个进程
    p = multiprocessing.Pool(processes=16)
    p.starmap(strategy, total_data)
    p.close()
    print(f"总耗时: {time.time() - start_time:.2f}秒")

其他补充优化建议

  • 共享内存优化(Windows用户):Windows下每个子进程会拷贝一份预加载的数据,内存占用会翻倍。如果内存紧张,可以用multiprocessing.Manager()或者shared_memory模块来实现数据共享,避免重复拷贝。
  • 转换数据格式:如果以后还要频繁处理这些数据,建议把CSV转成Parquet或Feather格式——这类格式读取速度比CSV快3-10倍,且占用的磁盘和内存空间更小。
  • 多线程预加载:预加载过程是IO密集型任务,可以用多线程(比如concurrent.futures.ThreadPoolExecutor)来加速加载速度,减少等待时间。

内容的提问来源于stack exchange,提问作者abhiraj gupta

火山引擎 最新活动