Python写入大数据流到Parquet遇内存、压缩及字典参数问题求助
解决PyArrow写入超大Parquet流的三大问题
我来帮你逐一解决遇到的问题,同时给出更高效的流式写入方案:
问题1:内存累积,数据未实时落地
你的代码每次只写入一行数据,虽然PyArrow的ParquetWriter理论上会分批刷盘,但单条写入的效率极低,再加上生成器是无限循环(while True),导致进程一直不终止,看起来数据始终存在内存里。
核心解决思路是设置合理的批次大小,攒够一定数量的数据再批量写入,既减少IO开销,也能确保数据及时刷入磁盘。另外实际场景中一定要给生成器添加终止条件(比如生成指定条数后停止)。
问题2:gzip压缩效果不明显
gzip的压缩效果依赖于数据的冗余度,单条数据的列块几乎没有冗余,所以压缩后体积和原数据差异很小。当你攒够足够多的批次数据后,Parquet的列存压缩特性会发挥作用,gzip的压缩效果会显著提升,和7zip的差距会缩小很多。
问题3:use_dictionary参数报错
这个报错是因为旧版本PyArrow的use_dictionary参数要求字段名是字节串(bytes),而你传入的是字符串(str)。有两种解决方式:
- 把字段名转为字节串,比如
[b'c1', b'c2'] - 升级PyArrow到较新版本(推荐),新版本已经支持直接传入字符串类型的字段名
优化后的完整代码
下面的代码彻底解决了所有问题,同时兼顾了效率和内存控制:
import pandas as pd import random import pyarrow as pa import pyarrow.parquet as pq def data_generator(total_records=10000): # 添加终止条件,生成指定数量的数据(实际场景可替换为业务终止逻辑) options = ['op1', 'op2', 'op3', 'op4'] for _ in range(total_records): dd = {'c1': random.randint(1, 10), 'c2': random.choice(options)} yield dd result_file_address = 'example.parquet' batch_size = 1000 # 每1000条数据批量写入一次,可根据内存情况调整 batch_data = [] # 先获取一条数据生成Schema(也可以手动定义更严谨的Schema) first_data = next(data_generator(total_records=1)) batch_data.append(first_data) pa_schema = pa.Schema.from_pandas(pd.DataFrame(batch_data).schema) # 初始化ParquetWriter,适配旧版本PyArrow的字节串字段名要求 with pq.ParquetWriter( result_file_address, pa_schema, compression='gzip', use_dictionary=[b'c1', b'c2'] ) as writer: # 读取生成器数据,攒批次写入 for dic_data in data_generator(total_records=10000): batch_data.append(dic_data) if len(batch_data) >= batch_size: # 批量转换为PyArrow Table并写入 df = pd.DataFrame(batch_data) table = pa.Table.from_pandas(df, schema=pa_schema) writer.write_table(table) batch_data = [] # 清空批次,释放内存 # 写入剩余的不足一批的数据 if batch_data: df = pd.DataFrame(batch_data) table = pa.Table.from_pandas(df, schema=pa_schema) writer.write_table(table) print("数据写入完成!")
额外优化建议
- 手动定义Schema:避免依赖Pandas自动推断,尤其是数据类型有严格要求时,手动定义更可靠:
pa_schema = pa.schema([ ('c1', pa.int64()), ('c2', pa.string()) ]) - 跳过Pandas直接用PyArrow:如果想进一步减少内存开销,可以直接用PyArrow的
RecordBatch构建数据,省去Pandas转换环节:from pyarrow import RecordBatch # 批量转换为PyArrow数组 c1_array = pa.array([d['c1'] for d in batch_data]) c2_array = pa.array([d['c2'] for d in batch_data]) batch = RecordBatch.from_arrays([c1_array, c2_array], schema=pa_schema) writer.write_batch(batch) - 调整压缩级别:gzip支持0-9的压缩级别(默认6),可以通过
compression_level参数调整:pq.ParquetWriter(..., compression='gzip', compression_level=9)
内容的提问来源于stack exchange,提问作者Mohsen Laali




