You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
文本清洗
特定字符替换
复制全文
特定字符替换

算子介绍

描述

正则表达式批量替换处理器

核心功能

  • 双模式替换机制:
    • 精确字符串匹配替换
    • 正则表达式模式匹配替换
  • 批量处理能力:
    • 支持多组 pattern-replacement 对并行处理
  • 容错机制:
    • 异常模式跳过并记录详细日志

Daft 调用

算子参数

输入

输入列名

说明

texts

包含原始文本内容的数组,元素类型为字符串

输出

替换后的文本内容数组,元素类型为字符串

参数

如参数没有默认值,则为必填参数

参数名称

类型

默认值

描述

patterns

list

需要替换的特定内容列表 * 支持正则表达式和特定字符串 * 多元素列表会逐个替换文本内容 * 示例: [r"\d+", "http://"]

replacements

list

被替换成的内容列表 * 若仅一个元素,则以该元素替换所有patterns内容 * 多元素时与patterns一一对应替换 * 示例: ["NUM", "URL"]

调用示例

下面的代码展示了如何使用 daft 运行算子对特定表达式进行替换。

from __future__ import annotations

import os

import daft
from daft import col
from daft.las.functions.text.regex_replacement import RegexReplacer
from daft.las.functions.udf import las_udf

if __name__ == "__main__":

    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import logging

        import ray

        def configure_logging():
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S.%s".format(),
            )
            logging.getLogger("tracing.span").setLevel(logging.WARNING)
            logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
            logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

        ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging})
        daft.context.set_runner_ray()
    daft.set_execution_config(actor_udf_ready_timeout=600)
    daft.set_execution_config(min_cpu_per_task=0)

    samples = {
        "text": [
            "<Query></Query><Title>提升党的领导力,推进国家治理体系和治理能力现代化</Title><Url>http://news.cnr.cn/native/gd/20191212/t20191212_524895559.shtml</Url>",
            None,
        ]
    }

    patterns = [r"<.*?>", "国家"]
    replacements = ["/replace_tag", "中国"]
    ds = daft.from_pydict(samples)
    ds = ds.with_column(
        "replaced_text",
        las_udf(
            RegexReplacer,
            construct_args={"patterns": patterns, "replacements": replacements},
        )(col("text")),
    )
    ds.show()

    # ╭───────────────────────────────────────┬────────────────────────────────╮
    # │ text                                  ┆ replaced_text                  │
    # │ ---                                   ┆ ---                            │
    # │ Utf8                                  ┆ Utf8                           │
    # ╞═══════════════════════════════════════╪════════════════════════════════╡
    # │ <Query></Query><Title>提升党的领导力…    ┆ /replace_tag/replace_tag/repl… │
    # ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
    # │ None                                  ┆ None                           │
    # ╰───────────────────────────────────────┴────────────────────────────────╯
最近更新时间:2026.01.08 19:14:22
这个页面对您有帮助吗?
有用
有用
无用
无用