You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python写入大数据流到Parquet遇内存、压缩及字典参数问题求助

解决PyArrow写入超大Parquet流的三大问题

我来帮你逐一解决遇到的问题,同时给出更高效的流式写入方案:

问题1:内存累积,数据未实时落地

你的代码每次只写入一行数据,虽然PyArrow的ParquetWriter理论上会分批刷盘,但单条写入的效率极低,再加上生成器是无限循环(while True),导致进程一直不终止,看起来数据始终存在内存里。

核心解决思路是设置合理的批次大小,攒够一定数量的数据再批量写入,既减少IO开销,也能确保数据及时刷入磁盘。另外实际场景中一定要给生成器添加终止条件(比如生成指定条数后停止)。

问题2:gzip压缩效果不明显

gzip的压缩效果依赖于数据的冗余度,单条数据的列块几乎没有冗余,所以压缩后体积和原数据差异很小。当你攒够足够多的批次数据后,Parquet的列存压缩特性会发挥作用,gzip的压缩效果会显著提升,和7zip的差距会缩小很多。

问题3:use_dictionary参数报错

这个报错是因为旧版本PyArrow的use_dictionary参数要求字段名是字节串(bytes),而你传入的是字符串(str)。有两种解决方式:

  • 把字段名转为字节串,比如[b'c1', b'c2']
  • 升级PyArrow到较新版本(推荐),新版本已经支持直接传入字符串类型的字段名

优化后的完整代码

下面的代码彻底解决了所有问题,同时兼顾了效率和内存控制:

import pandas as pd
import random
import pyarrow as pa
import pyarrow.parquet as pq

def data_generator(total_records=10000):
    # 添加终止条件,生成指定数量的数据(实际场景可替换为业务终止逻辑)
    options = ['op1', 'op2', 'op3', 'op4']
    for _ in range(total_records):
        dd = {'c1': random.randint(1, 10), 'c2': random.choice(options)}
        yield dd

result_file_address = 'example.parquet'
batch_size = 1000  # 每1000条数据批量写入一次,可根据内存情况调整
batch_data = []

# 先获取一条数据生成Schema(也可以手动定义更严谨的Schema)
first_data = next(data_generator(total_records=1))
batch_data.append(first_data)
pa_schema = pa.Schema.from_pandas(pd.DataFrame(batch_data).schema)

# 初始化ParquetWriter,适配旧版本PyArrow的字节串字段名要求
with pq.ParquetWriter(
    result_file_address,
    pa_schema,
    compression='gzip',
    use_dictionary=[b'c1', b'c2']
) as writer:
    # 读取生成器数据,攒批次写入
    for dic_data in data_generator(total_records=10000):
        batch_data.append(dic_data)
        if len(batch_data) >= batch_size:
            # 批量转换为PyArrow Table并写入
            df = pd.DataFrame(batch_data)
            table = pa.Table.from_pandas(df, schema=pa_schema)
            writer.write_table(table)
            batch_data = []  # 清空批次,释放内存
    
    # 写入剩余的不足一批的数据
    if batch_data:
        df = pd.DataFrame(batch_data)
        table = pa.Table.from_pandas(df, schema=pa_schema)
        writer.write_table(table)

print("数据写入完成!")

额外优化建议

  1. 手动定义Schema:避免依赖Pandas自动推断,尤其是数据类型有严格要求时,手动定义更可靠:
    pa_schema = pa.schema([
        ('c1', pa.int64()),
        ('c2', pa.string())
    ])
    
  2. 跳过Pandas直接用PyArrow:如果想进一步减少内存开销,可以直接用PyArrow的RecordBatch构建数据,省去Pandas转换环节:
    from pyarrow import RecordBatch
    # 批量转换为PyArrow数组
    c1_array = pa.array([d['c1'] for d in batch_data])
    c2_array = pa.array([d['c2'] for d in batch_data])
    batch = RecordBatch.from_arrays([c1_array, c2_array], schema=pa_schema)
    writer.write_batch(batch)
    
  3. 调整压缩级别:gzip支持0-9的压缩级别(默认6),可以通过compression_level参数调整:
    pq.ParquetWriter(..., compression='gzip', compression_level=9)
    

内容的提问来源于stack exchange,提问作者Mohsen Laali

火山引擎 最新活动