算子使用前置条件:开通视觉智能产品-文字识别-智能文档解析服务,产品链接见:https://www.volcengine.com/docs/86081/1804813
输入列名 | 说明 |
|---|---|
data_col | 包含 PDF 文件 URL 或 Base64 编码内容的数组 |
file_name_col | 包含文件名的数组,用于在 TOS 中保存 |
一个结构体数组,包含解析结果。
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
input_type | str | 输入类型,支持 "url"(文件链接)或 "base64"(Base64 编码内容)。 可选值:["url", "base64"] | |
output_tos_path | str | 解析内容存储到的 TOS 目录。 解析内容保存到的 TOS 目录下的 txt 文件夹中, 同时会将PDF中的图片转存到 TOS目录下的 images 文件夹中, 并在保存的 txt 文件中使用图片 TOS 路径。 如果该值设置为空,则不需要将解析内容存储到 TOS 中。 默认值:"" | |
version | str | v3 | 解析服务版本。 可选值:["v3"] 默认值:"v3" |
file_type | str | 文件类型。 可选值:["pdf"] 默认值:"pdf" | |
page_start | int | 0 | 起始解析页码,从 0 开始。 默认值:0 |
page_parsed_num | int | -1 | 解析的页数,-1 表示解析全部。 默认值:-1 |
page_batch | int | 300 | 批量解析的页数。 默认值:300 |
parse_mode | str | auto | 解析模式。 可选值:["auto", "fast", "accurate"] 默认值:"auto" |
table_mode | str | markdown | 表格解析模式。 可选值:["markdown", "html", "excel"] 默认值:"markdown" |
filter_header | str | True | 是否过滤页眉页脚。 可选值:["true", "false"] 默认值:"true" |
timeout | int | 120 | 超时时间,单位为秒。 默认值:120 |
qps | int | 2 | 请求 QPS 限制。 默认值:2 |
max_retries | int | 3 | 最大重试次数。 默认值:3 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子解析pdf文档。
# Copyright (c) Beijing Volcano Engine Technology Ltd. from __future__ import annotations import os import daft from daft import col from daft.las.functions.doc import PDFParse from daft.las.functions.udf import las_udf if __name__ == "__main__": # 更改完解析内容会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_path = f"tos://{TOS_DIR}/doc/parse/pdf_parse" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "input_url": [ f"https://{tos_dir_url}/public/shared_doc_dataset/sample.pdf" ], "filename": ["sample.pdf"], } df = daft.from_pydict(samples) constructor_kwargs = {"input_type": "url", "output_tos_path": output_tos_path, "qps": 1} # 使用 Daft 进行分布式处理 df = df.with_column( "parsed_result", las_udf(PDFParse, construct_args=constructor_kwargs, concurrency=1)(col("input_url"), col("filename")), ) df = df.with_column("parsed_text", col("parsed_result").struct.get("parsed_origin_text")) df.show() # ╭────────────────────────────────┬────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────╮ # │ input_url ┆ filename ┆ parsed_result ┆ parsed_text │ # │ --- ┆ --- ┆ --- ┆ --- │ # │ Utf8 ┆ Utf8 ┆ Struct[parsed_origin_text: Utf8, parsed_plain_text: Utf8, parsed_detail: Utf8, parsed_file_path: Utf8, ┆ Utf8 │ # │ ┆ ┆ parsed_image_filenames: List[Utf8]] ┆ │ # ╞════════════════════════════════╪════════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ sample.pdf ┆ {parsed_origin_text: ![fig_94… ┆ ![fig_94052](https://pdf-buck… │ # ╰────────────────────────────────┴────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────────────────────╯