CommonCrawl网页内容提取器,支持多种解析策略
输入列名 | 说明 |
|---|---|
warc_files | 包含WARC数据的列,支持以下格式: - warc_base64: base64编码的WARC字符串 - warc_url: WARC文件路径或TOS链接 - warc_binary: 原始WARC二进制数据 |
提取结果列表,每个元素包含以下字段:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
warc_src_type | str | WARC数据来源类型 支持的WARC格式类型,包含: - warc_binary: 原始二进制数据 - warc_base64: Base64编码数据 - warc_url: 文件路径或TOS存储链接 可选值:["warc_binary", "warc_url", "warc_base64"] | |
extractor_type | str | trafilatura | 选择使用的网页内容提取器类型 可选值:["trafilatura", "justext", "goose3"] 默认值:"trafilatura" |
max_records | int or None | 限制处理的WARC记录数量 默认值:None(无限制) |
下面的代码展示了如何使用 daft 运行算子从CommonCrawl的WARC文件中提取网页正文,支持文件路径、二进制数据和base64编码等多种输入格式。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.text.commoncrawl_content_extractor import CommonCrawlContentExtractor from daft.las.functions.udf import las_udf if __name__ == "__main__": if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "warc_data": [ f"https://{tos_dir_url}/public/shared_file_dataset/sample.warc.gz" ] } extractor_type = "trafilatura" max_records = 5 df = daft.from_pydict(samples) df = df.with_column( "extracted_content", las_udf( CommonCrawlContentExtractor, construct_args={ "warc_src_type": "warc_url", "extractor_type": extractor_type, "max_records": max_records, }, num_gpus=0, batch_size=1, concurrency=1, )(col("warc_data")), ) df.show() # ╭──────────────────────────────────────────────┬─────────────────────────────────────────────────────────────╮ # │ warc_files ┆ extracted_content │ # │ --- ┆ --- │ # │ Utf8 ┆ List[Struct[url: Utf8, content: Utf8, warc_file: Utf8, │ # │ ┆ extractor: Utf8]] │ # ╞══════════════════════════════════════════════╪═════════════════════════════════════════════════════════════╡ # │ https://las-public-data-qa.tos… ┆ [{url: http://00852imports.com/detail/5389084.html, │ # │ ┆ content: 随着互联网的发展,人们对网络速度的要求也越来越高…, │ # │ ┆ warc_file: sample.warc.gz, extractor: trafilatura}, │ # │ ┆ {url: http://02y3tcpv.gd9.cc/?penglaibexdkcl224396.html, │ # │ ┆ content: 查看更多相关内容\n\n取消关注在如今的数字时代…, │ # │ ┆ warc_file: sample.warc.gz, extractor: trafilatura}] │ # ╰──────────────────────────────────────────────┴─────────────────────────────────────────────────────────────╯