Excel 表格解析处理器,支持多格式输出与结构化数据提取
输入列名 | 说明 |
|---|---|
xlsx_col | 包含 xlsx/xls 文件路径的列 |
struct 数组,字段包括
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
if_save_md_content | bool | True | 是否保存为 markdown 格式,默认 True |
if_save_html_content | bool | False | 是否保存为 html 格式,默认 False 当两者都为 True 时,仅保存 markdown |
output_tos_path | str | 解析内容存储到的 TOS 目录,为空则不上传 |
下面的代码展示了如何使用 pandas(适用于单机)和 ray(适用于分布式)运行算子解析xlsx文档。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.doc import XlsxParse from daft.las.functions.udf import las_udf if __name__ == "__main__": # 更改完解析内容会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_path = f"tos://{TOS_DIR}/doc/parse/xlsx_parse" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用绝对URL,去掉环境变量依赖 tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "xlsx_path": [ f"https://{tos_dir_url}/public/shared_doc_dataset/sample.xlsx" ], } df = daft.from_pydict(samples) constructor_kwargs = { "if_save_md_content": True, "if_save_html_content": False, "output_tos_path": output_tos_path, } # 使用 Daft 进行分布式处理 df = df.with_column( "result", las_udf(XlsxParse, construct_args=constructor_kwargs, concurrency=1)(col("xlsx_path")), ) df = df.with_column("data_item_uri", col("result").struct.get("data_item_uri")) df = df.with_column("text", col("result").struct.get("text")) df = df.with_column("text_by_table", col("result").struct.get("text_by_table")) df.show() # ╭────────────────────────────────┬────────────────────────────────────────────────────────────────────┬────────────────────────────────┬────────────────────────────────────────┬────────────────────────────────────────╮ # │ xlsx_path ┆ result ┆ data_item_uri ┆ text ┆ text_by_table │ # │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ # │ Utf8 ┆ Struct[data_item_uri: Utf8, text: Utf8, text_by_table: List[Utf8]] ┆ Utf8 ┆ Utf8 ┆ List[Utf8] │ # ╞════════════════════════════════╪════════════════════════════════════════════════════════════════════╪════════════════════════════════╪════════════════════════════════════════╪════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ {data_item_uri: tos://tos_bucket/doc/parse/xlsx_parse/sample.md, … ┆ tos://tos_bucket/doc/parse/xls ┆ | 产品ID | 产品名称 | 价格 | … ┆ [| 产品ID | 产品名称 | 价格 | … │ # ╰────────────────────────────────┴────────────────────────────────────────────────────────────────────┴────────────────────────────────┴────────────────────────────────────────┴────────────────────────────────────────╯