You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python下高频逐笔Tick数据流本地持久化的高效方案咨询(低RAM占用支持实时分析)

Python下高频逐笔Tick数据流本地持久化的高效方案咨询(低RAM占用支持实时分析)

兄弟,你的需求我太懂了——40个品种、每秒最高1万笔Tick、持续一周的数据流,既要低RAM占用,又要随时能从本地加载分析已存数据,还不能每次更新都重写整个文件拖慢速度。你之前试的Pandas concat+Pickle和JSON追加确实踩了不少典型坑,我给你捋几个更贴合需求的靠谱方案:

方案一:SQLite(轻量本地数据库,灵活查询首选)

SQLite绝对是这类场景的宝藏工具——它是文件型数据库,不需要额外搭服务器,完全符合你本地存储的需求,而且天生支持增量插入,不用每次重写全量数据,RAM占用极低。

核心优势:

  • 单条/批量插入都高效,不会像Pandas那样复制整个DataFrame吃内存
  • 随时可以用SQL查询任意时间段、任意品种的Tick数据,不用加载全量文件
  • 文件大小比JSON小很多,结构比Pickle更稳定

代码示例:

import sqlite3
from datetime import datetime

# 初始化数据库(第一次运行创建表,之后直接连接)
conn = sqlite3.connect('ticks.db')
cursor = conn.cursor()
# 创建表,提前定义字段类型提升写入效率
cursor.execute('''
CREATE TABLE IF NOT EXISTS ticks (
    time_sent TIMESTAMP,
    time_received TIMESTAMP,
    instrument TEXT,
    bid REAL,
    ofr REAL
)
''')
conn.commit()

# 单条Tick插入函数(建议攒1000条再commit,降低IO开销)
def insert_tick(update):
    time_sent = datetime.fromtimestamp(int(update.get_time())/1000)
    time_received = datetime.now()
    instrument = update.get_instrument()
    bid = update.get_bid_value()
    ofr = update.get_ofr_value()
    
    # 参数化查询,避免SQL注入同时提升写入速度
    cursor.execute('''
    INSERT INTO ticks VALUES (?, ?, ?, ?, ?)
    ''', (time_sent, time_received, instrument, bid, ofr))
    conn.commit()

# 实时分析示例(按需查询,无需加载全量数据)
def query_ticks(instrument, start_time, end_time):
    cursor.execute('''
    SELECT * FROM ticks 
    WHERE instrument = ? AND time_sent BETWEEN ? AND ?
    ''', (instrument, start_time, end_time))
    return cursor.fetchall()

方案二:HDF5(Pandas友好型,大数据分析首选)

如果后续主要用Pandas做数据分析,HDF5绝对是最优解之一。Pandas自带的HDFStore支持增量追加写入,采用分块存储,不会把全量数据加载到内存,写入速度也非常快。

核心优势:

  • 完美适配Pandas生态,写入读取都是DataFrame格式,无需额外转换
  • 支持压缩,文件大小比Pickle更小,远优于JSON
  • 可指定data_columns,后续查询特定列或过滤条件时无需加载全量数据

代码示例:

import pandas as pd
from datetime import datetime

# 增量写入Tick到HDF5
def append_tick_to_hdf(update):
    new_row = pd.DataFrame({
        'Time_sent': [datetime.fromtimestamp(int(update.get_time())/1000)],
        'Time_received': [datetime.now()],
        'Instrument': [update.get_instrument()],
        'Bid': [update.get_bid_value()],
        'Ofr': [update.get_ofr_value()]
    })
    
    # append模式+table格式,支持后续条件查询
    with pd.HDFStore('ticks.h5') as store:
        store.append('ticks', new_row, format='table', data_columns=True, append=True)

# 实时分析示例(按需加载指定条件数据)
def load_ticks_from_hdf(instrument, start_time):
    with pd.HDFStore('ticks.h5') as store:
        df = store.select('ticks', where=['Instrument = ?', 'Time_sent >= ?'], params=[instrument, start_time])
    return df

方案三:Parquet(极致压缩,跨语言兼容首选)

Parquet是列式存储格式,压缩率极高(比JSON小5-10倍很常见),支持增量写入(需用pyarrowfastparquet库),适合长期存储高频Tick数据,还能跨语言读取(Java、R等都支持)。

核心优势:

  • 压缩率拉满,大幅节省磁盘空间
  • 列式存储,分析时仅加载需要的列,进一步降低RAM占用
  • 跨语言兼容,数据复用性强

代码示例(基于pyarrow):

import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
import os

# 初始化Parquet写入器(首次创建文件,后续追加)
parquet_file = 'ticks.parquet'
schema = pa.schema([
    ('Time_sent', pa.timestamp('ns')),
    ('Time_received', pa.timestamp('ns')),
    ('Instrument', pa.string()),
    ('Bid', pa.float64()),
    ('Ofr', pa.float64())
])

writer = pq.ParquetWriter(parquet_file, schema, append=os.path.exists(parquet_file))

# 批量写入更高效,这里示例单条(建议攒1000条再写入)
def append_tick_to_parquet(update):
    data = {
        'Time_sent': [datetime.fromtimestamp(int(update.get_time())/1000)],
        'Time_received': [datetime.now()],
        'Instrument': [update.get_instrument()],
        'Bid': [update.get_bid_value()],
        'Ofr': [update.get_ofr_value()]
    }
    table = pa.Table.from_pydict(data, schema=schema)
    writer.write_table(table)

# 实时分析示例(过滤读取,转Pandas方便后续处理)
def load_ticks_from_parquet(instrument):
    table = pq.read_table(parquet_file, filters=[('Instrument', '=', instrument)])
    return table.to_pandas()

对比你之前的方案

  • Pandas concat+Pickle:每次concat都会复制整个DataFrame,数据量上升后RAM直接爆掉,且全量重写文件速度极慢,完全不适合高频场景
  • JSON追加:每行独立JSON导致文件体积大,后续分析需全量加载解析,RAM占用高且解析速度慢,仅适合小数据量场景

最终推荐

  • 若需随时灵活查询(按品种、时间范围快速筛选):选SQLite
  • 若后续主要用Pandas做数据分析:选HDF5
  • 若追求极致压缩和跨语言复用:选Parquet

额外小技巧:尽量批量写入(比如攒1000条Tick再写入一次),比单条写入减少IO次数,进一步提升效率并降低磁盘损耗。

备注:内容来源于stack exchange,提问作者Konstantinos

火山引擎 最新活动