关于将海量股票OHLC CSV数据存入字典优化多进程IO操作的可行性咨询
问题:多进程处理大量CSV文件时,预加载所有数据到字典是否能优化性能?
我有大约1500个包含股票OHLC数据的CSV文件,每个文件有90000-100000行数据。目前用多进程代码迭代处理这些文件,当开16个进程时系统会轻微卡顿,确定是IO设备使用率过高导致的(因为要逐个打开文件)。现在用10个进程运行正常,想咨询:把全部1500个CSV文件的数据存入单个字典后再运行代码是否是好方案?此举能否减少处理时间或缓解系统卡顿?
数据格式说明
每个CSV包含股票OHLC相关数据(原提及的图片无法展示)
当前使用的代码
import numpy as np import pandas as pd import os import multiprocessing import datetime import itertools import time import warnings warnings.filterwarnings('ignore') # bank nifty bn_futures = pd.read_csv('E:\\Tanmay\\Data\\Bank Nifty Index\\BankNifty_Futures GFDL 2011-2020.csv') bn_futures['Date_time'] = bn_futures['Date'] + ' ' + bn_futures['Time'] bn_futures['Date_time'] = pd.to_datetime(bn_futures['Date_time'],format='%Y-%m-%d %H:%M:%S') bn_futures = bn_futures[bn_futures['Date_time'].dt.date > datetime.date(2016,5,26)] req_cols = [x for x in bn_futures.columns if 'Unnamed' not in x] bn_futures = bn_futures[req_cols] bn_futures['straddle'] = round(bn_futures['Close'],-2) bn_futures['straddle'] = bn_futures['straddle'].astype(int) bn_futures['straddle'] = bn_futures['straddle'].astype(str) bn_futures['Date'] = bn_futures['Date_time'].dt.date dates = list(set(bn_futures['Date'].to_list())) dates.sort() option_files1 = os.listdir('E:\\2nd Set\\') option_files = [] for i in option_files1: if datetime.datetime.strptime(i.split('.')[0],'%Y-%m-%d').date() >= datetime.date(2016,5,27): option_files.append(i) def time_loop(start_time,end_time,timeframe): start_datetime = datetime.datetime.combine(datetime.datetime.today().date(),start_time) end_datetime = datetime.datetime.combine(datetime.datetime.today().date(),end_time) difference = int((((end_datetime - start_datetime).total_seconds())/60)/timeframe) final_time_list = [] for i in range(difference): final_time_list.append((start_datetime+datetime.timedelta(minutes=i*timeframe)).time()) return final_time_list entry_time_list = time_loop(datetime.time(9,19),datetime.time(15,19),5) sl_list = np.arange(1.1, 2, 0.1) # sl_list = list(range(1.1,2,0.1)) paramlist = list(itertools.product(entry_time_list,sl_list)) def strategy(main_entry_time,sl): print(main_entry_time,sl) main_dict = {} for file in option_files: date = datetime.datetime.strptime(file.split('.')[0],'%Y-%m-%d').date() try: # reading current date bn futures bn = bn_futures[bn_futures['Date'] == date] # reading main time bn futures b = bn[bn['Date_time'].dt.time == main_entry_time] straddle_value = b['straddle'].iloc[0] df = pd.read_csv('E:\\Tanmay\\Data\\Bank nifty Intraday All expiries\\2nd Set\\'+file) df['Date_time'] = pd.to_datetime(df['Date_time'],format='%Y-%m-%d %H:%M:%S') h = [k for k in df.columns if 'Un' not in k] df = df[h] total_df = df[(df['Ticker'].str.contains(straddle_value)) & (df['Expiry_number'] == 0) & (df['W/M'] == 'W')] option_types = ['CE','PE'] for option in option_types: option_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time == main_entry_time)] entry_price = option_df['Close'].iloc[0] strike = option entry_time = main_entry_time trade_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time > main_entry_time)] trade_df.sort_values(by='Date_time',inplace=True) for t in trade_df.index: if trade_df['Date_time'][t].time() > entry_time: if trade_df['High'][t] > entry_price * sl: exit_price = entry_price * sl exit_time = trade_df['Date_time'][t].time() profit = entry_price - exit_price - 0.02* entry_price main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'SL'} break if trade_df['Date_time'][t].time() >= datetime.time(15,14,0): exit_price = trade_df['Close'][t] exit_time = trade_df['Date_time'][t].time() profit = entry_price - exit_price - 0.02* entry_price main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'EOD'} break except Exception as yy: pass final_dict = dict(main_dict) final_df = pd.DataFrame(final_dict) final_df = final_df.transpose() final_df.to_csv('SL_'+str(sl)+'entry_time_'+str(main_entry_time).replace(':','')+'entry_date_'+str(date)+'.csv') if __name__=='__main__': start_time = time.time() # mgr = multiprocessing.Manager() # main_dict = mgr.dict() total_data = paramlist p = multiprocessing.Pool(processes=10) p.starmap(strategy,total_data) p.close()
回答
直接给结论:预加载所有CSV数据到内存(比如一个以日期为键的字典)是非常可行的优化方案,不仅能彻底缓解IO卡顿,还能显著减少整体处理时间,但需要先评估你的系统内存是否足够,下面展开说细节:
为什么预加载能解决问题?
你当前的代码存在一个核心问题:每个strategy进程都会重复读取同一个CSV文件(因为不同参数组合会处理同一个日期的数据),这导致了大量的重复磁盘IO——1500个文件,每个会被读取N次(N是你的参数组合总数),磁盘根本扛不住高并发的重复读取请求,所以开16个进程就会卡顿。
如果提前把所有CSV文件加载到内存字典里,所有进程都可以直接从内存读取数据,完全避免重复的磁盘IO操作:
- 磁盘IO的瓶颈会彻底消失,你可以放心开到16个甚至更多进程
- 内存读取速度比磁盘快几个数量级,整体处理时间会大幅下降
预加载的实现要点
- 预加载时机:一定要在启动多进程之前完成所有CSV的加载,这样子进程可以直接复用父进程加载好的数据(Linux/macOS下是共享内存,Windows下是拷贝内存,后者会占用更多内存,但依然比重复读磁盘好)。
- 提前预处理:预加载时就完成所有通用的预处理步骤——比如解析
Date_time、过滤Unnamed列,这样每个strategy函数就不用重复做这些工作,进一步节省时间。 - 内存占用评估:每个CSV是9-10万行,按pandas的存储效率估算,每个文件大概占50-100MB,1500个文件总内存占用在75-150GB左右。你需要确认你的系统内存是否足够——如果内存不足,系统会启用虚拟内存(swap),反而会更卡。如果内存不够,可以考虑分批预加载,或者把CSV转成Parquet/Feather这类更省内存的格式。
优化后的核心代码示例
# 新增预加载函数,在启动进程前执行 def load_all_option_data(): date_to_df = {} for file in option_files: date = datetime.datetime.strptime(file.split('.')[0],'%Y-%m-%d').date() df = pd.read_csv('E:\\Tanmay\\Data\\Bank nifty Intraday All expiries\\2nd Set\\'+file) # 提前完成所有通用预处理 df['Date_time'] = pd.to_datetime(df['Date_time'],format='%Y-%m-%d %H:%M:%S') h = [k for k in df.columns if 'Un' not in k] df = df[h] date_to_df[date] = df return date_to_df # 修改strategy函数,接收预加载的字典 def strategy(main_entry_time, sl, date_to_df): print(main_entry_time,sl) main_dict = {} # 直接遍历预加载的日期数据,不用再读文件 for date in date_to_df: try: bn = bn_futures[bn_futures['Date'] == date] b = bn[bn['Date_time'].dt.time == main_entry_time] straddle_value = b['straddle'].iloc[0] # 从内存字典直接取数据,跳过文件读取步骤 df = date_to_df[date] total_df = df[(df['Ticker'].str.contains(straddle_value)) & (df['Expiry_number'] == 0) & (df['W/M'] == 'W')] option_types = ['CE','PE'] for option in option_types: option_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time == main_entry_time)] entry_price = option_df['Close'].iloc[0] strike = option entry_time = main_entry_time trade_df = total_df[(total_df['Ticker'].str.contains(option)) & (total_df['Date_time'].dt.time > main_entry_time)] trade_df.sort_values(by='Date_time',inplace=True) for t in trade_df.index: if trade_df['Date_time'][t].time() > entry_time: if trade_df['High'][t] > entry_price * sl: exit_price = entry_price * sl exit_time = trade_df['Date_time'][t].time() profit = entry_price - exit_price - 0.02* entry_price main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'SL'} break if trade_df['Date_time'][t].time() >= datetime.time(15,14,0): exit_price = trade_df['Close'][t] exit_time = trade_df['Date_time'][t].time() profit = entry_price - exit_price - 0.02* entry_price main_dict['SL_'+str(sl)+'entry_time_'+str(main_entry_time)+'entry_date_'+str(date)+'_'+option] = {'Entry_date':str(date),'Entry_time':entry_time,'Strike':str(straddle_value)+option,'Entry_price':entry_price,'Exit_price':exit_price,'exit_time':exit_time,'profit':profit,'Reason':'EOD'} break except Exception as yy: pass final_dict = dict(main_dict) final_df = pd.DataFrame(final_dict) final_df = final_df.transpose() final_df.to_csv('SL_'+str(sl)+'entry_time_'+str(main_entry_time).replace(':','')+'entry_date_'+str(date)+'.csv') if __name__=='__main__': start_time = time.time() # 先完成预加载 print("开始预加载所有CSV数据...") date_to_df = load_all_option_data() print("预加载完成,启动多进程...") # 构造包含预加载字典的参数列表 total_data = [(entry_time, sl, date_to_df) for entry_time, sl in paramlist] # 现在可以放心开到16个进程 p = multiprocessing.Pool(processes=16) p.starmap(strategy, total_data) p.close() print(f"总耗时: {time.time() - start_time:.2f}秒")
其他补充优化建议
- 共享内存优化(Windows用户):Windows下每个子进程会拷贝一份预加载的数据,内存占用会翻倍。如果内存紧张,可以用
multiprocessing.Manager()或者shared_memory模块来实现数据共享,避免重复拷贝。 - 转换数据格式:如果以后还要频繁处理这些数据,建议把CSV转成Parquet或Feather格式——这类格式读取速度比CSV快3-10倍,且占用的磁盘和内存空间更小。
- 多线程预加载:预加载过程是IO密集型任务,可以用多线程(比如
concurrent.futures.ThreadPoolExecutor)来加速加载速度,减少等待时间。
内容的提问来源于stack exchange,提问作者abhiraj gupta




