You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Python与pyxlsb查询xlsb数据时的性能优化问题

使用Python与pyxlsb查询xlsb数据时的性能优化问题

嗨,我看你现在被大XLSB文件的处理速度搞头疼了——3万行300列的规模用逐行循环确实会慢到让人崩溃,咱们来把这个方案彻底优化一下:

先聊聊原代码的瓶颈

你当前的代码是用pyxlsb逐行遍历工作表,每一行还要做类型检查、条件判断,这种Python层面的循环在大数据量下效率极低。毕竟Python循环本身就慢,再加上每个单元格都要单独读取、处理,3万行下来光是循环开销就够大的了,更别说还要处理异常,难怪只能用缩减数据集测试。

优化方案:用Pandas的矢量化操作替代逐行循环

Pandas的底层是用C实现的批量操作(矢量化),比纯Python循环快几十甚至上百倍,而且配合pyxlsb引擎可以直接读取XLSB文件,完美适配你的场景。下面是修改后的完整代码:

import pandas as pd
import datetime

print("Start", datetime.datetime.now())
# 读取CSV里的参数(这部分逻辑可以保留)
csv_file = r"L:\Projects\P1563 V4\Data\Summary\ANR Sweeps\Python\ANR Sweeps_Python_simple.csv"
csv_data = pd.read_csv(csv_file, encoding='ISO-8859-1', header=None)

xlsb_path = csv_data.iloc[2, 0]
xlsb_file = csv_data.iloc[2, 1]
sheet_name = csv_data.iloc[0, 4]
column_header = csv_data.iloc[1, 4]
start_time = float(csv_data.iloc[2, 2])
end_time = float(csv_data.iloc[2, 3])

print("CSV open", datetime.datetime.now())

# 直接用Pandas读取整个工作表到DataFrame,用pyxlsb作为引擎
xlsb_full_path = f"{xlsb_path}{xlsb_file}.xlsb"
df = pd.read_excel(xlsb_full_path, sheet_name=sheet_name, engine='pyxlsb')

print("XLSB loaded", datetime.datetime.now())

# 确保时间列是数值类型(第一列是时间,用df.columns[0]获取列名)
time_col_name = df.columns[0]
df[time_col_name] = pd.to_numeric(df[time_col_name], errors='coerce')

# 筛选时间范围内的有效数据(自动跳过空值)
time_mask = (df[time_col_name] >= start_time) & (df[time_col_name] <= end_time)
filtered_values = df.loc[time_mask, column_header].dropna()

# 计算并输出平均值
if not filtered_values.empty:
    average = filtered_values.mean()
    print(f"Average of '{column_header}' between {start_time} and {end_time}: {average}")
else:
    print(f"No valid data found in column '{column_header}' between {start_time} and {end_time}.")

print("Done", datetime.datetime.now())

进一步优化:只读取需要的列(省内存+更快)

如果担心全表加载占内存(300列3万行其实也就几十MB,一般没问题),可以只读取时间列目标数据列,进一步减少读取时间和内存占用:

import pandas as pd
from pyxlsb import open_workbook
import datetime

print("Start", datetime.datetime.now())
# 读取CSV参数
csv_file = r"L:\Projects\P1563 V4\Data\Summary\ANR Sweeps\Python\ANR Sweeps_Python_simple.csv"
csv_data = pd.read_csv(csv_file, encoding='ISO-8859-1', header=None)

xlsb_path = csv_data.iloc[2, 0]
xlsb_file = csv_data.iloc[2, 1]
sheet_name = csv_data.iloc[0, 4]
column_header = csv_data.iloc[1, 4]
start_time = float(csv_data.iloc[2, 2])
end_time = float(csv_data.iloc[2, 3])

print("CSV open", datetime.datetime.now())

# 先获取工作表的列名,确定要读取的列索引
xlsb_full_path = f"{xlsb_path}{xlsb_file}.xlsb"
with open_workbook(xlsb_full_path) as wb:
    with wb.get_sheet(sheet_name) as sheet:
        headers = [cell.v for cell in next(sheet.rows())]
time_col_idx = 0
target_col_idx = headers.index(column_header)

# 只读取时间列和目标列
df = pd.read_excel(
    xlsb_full_path,
    sheet_name=sheet_name,
    engine='pyxlsb',
    usecols=[time_col_idx, target_col_idx]
)
# 重命名列方便后续操作
df.columns = ['time', column_header]

print("XLSB loaded", datetime.datetime.now())

# 处理时间列并筛选
df['time'] = pd.to_numeric(df['time'], errors='coerce')
time_mask = (df['time'] >= start_time) & (df['time'] <= end_time)
filtered_values = df.loc[time_mask, column_header].dropna()

# 计算平均值
if not filtered_values.empty:
    average = filtered_values.mean()
    print(f"Average of '{column_header}' between {start_time} and {end_time}: {average}")
else:
    print(f"No valid data found in column '{column_header}' between {start_time} and {end_time}.")

print("Done", datetime.datetime.now())

为什么这个方案快?

  1. 批量读取:Pandas用底层优化的代码读取整个工作表,比逐行读取快得多;
  2. 矢量化操作:筛选、计算平均值都是批量处理,避免了Python循环的开销;
  3. 自动处理空值dropna()pd.to_numeric(errors='coerce')可以自动跳过无效数据,不用手动写异常判断。

这个方案应该能让你在几秒内处理完3万行的数据,不用再依赖缩减数据集测试啦。

备注:内容来源于stack exchange,提问作者James

火山引擎 最新活动