You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python读取压缩流时,如何获取已读压缩字节数以实现进度指示?

跟踪压缩流中已读取的原始压缩字节数

要获取从压缩流中读取的原始压缩字节数,而不是解压后的数据量,你需要在压缩库读取原始数据的底层环节做跟踪——因为像gzip.GzipFile这类工具的tell()方法返回的是解压后的字节位置,而且用迭代器(比如for line in gz)时还会禁用位置查询导致报错。

这里有个简单可靠的方案:自定义一个流包装类,用来统计每次读取的压缩字节数,再把这个包装后的流传给压缩库使用。

步骤1:实现字节跟踪的包装类

这个类会包裹原始的文件/流对象,代理所有读取操作并累加读取的字节数:

class TrackingReader:
    def __init__(self, fileobj):
        self.fileobj = fileobj
        self.compressed_bytes_read = 0  # 记录已读取的压缩字节数

    def read(self, size=-1):
        data = self.fileobj.read(size)
        self.compressed_bytes_read += len(data)
        return data

    def readinto(self, b):
        # 处理压缩库内部可能用到的readinto方法(更高效的字节读取)
        bytes_read = self.fileobj.readinto(b)
        self.compressed_bytes_read += bytes_read
        return bytes_read

    # 代理原始流的其他方法(比如close、seek等)
    def __getattr__(self, name):
        return getattr(self.fileobj, name)

步骤2:结合压缩库使用

gzip为例,我们可以这样读取文件并跟踪压缩字节的读取进度:

import gzip
import os

# 示例:读取本地gzip文件
compressed_file = "your_file.gz"
# 获取压缩文件的总大小(用于计算进度百分比)
total_compressed_size = os.path.getsize(compressed_file)

with open(compressed_file, "rb") as raw_file:
    # 用TrackingReader包裹原始文件对象
    tracker = TrackingReader(raw_file)
    with gzip.GzipFile(fileobj=tracker) as gz_stream:
        # 读取解压后的数据(这里以逐行为例)
        for line in gz_stream:
            # 这里写你的业务逻辑,比如处理每行数据
            # process_line(line)
            
            # 打印进度(用\r实现单行刷新)
            progress_percent = (tracker.compressed_bytes_read / total_compressed_size) * 100
            print(f"已读取压缩字节: {tracker.compressed_bytes_read}/{total_compressed_size} ({progress_percent:.1f}%)", end="\r")

扩展场景:网络压缩流

如果是从网络获取压缩流(比如requests的流式响应),同样可以用这个方法:

import requests
import gzip

url = "https://example.com/large_compressed_data.gz"
response = requests.get(url, stream=True)

# 获取总压缩大小(依赖服务器返回Content-Length头)
total_compressed_size = int(response.headers.get("Content-Length", 0))

# 用TrackingReader包裹响应的原始流
tracker = TrackingReader(response.raw)
with gzip.GzipFile(fileobj=tracker) as gz_stream:
    for chunk in gz_stream.iter_chunked(8192):
        # 处理数据块
        # process_chunk(chunk)
        
        if total_compressed_size > 0:
            progress = (tracker.compressed_bytes_read / total_compressed_size) * 100
            print(f"已读取压缩字节: {tracker.compressed_bytes_read}/{total_compressed_size} ({progress:.1f}%)", end="\r")
        else:
            # 如果没有总大小,只显示已读取的字节数
            print(f"已读取压缩字节: {tracker.compressed_bytes_read}", end="\r")

为什么这个方法有效?

压缩库(比如gzip)内部会从传入的fileobj读取原始的压缩字节,我们的TrackingReader会在每次读取时累加字节数,所以compressed_bytes_read完全对应你实际从压缩流中读取的原始数据量,和解压后的数据毫无关系。

而你之前遇到的问题:

  • 直接用GzipFile.tell():返回的是解压后的字节位置,不是压缩的。
  • readline()+跟踪读取长度:得到的是解压后的行长度,也不是压缩字节数。

这个方案完美避开了这些问题,因为我们直接在压缩数据的读取源头做统计。

内容的提问来源于stack exchange,提问作者m000

火山引擎 最新活动