You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
文本安全识别
文本内容风险识别
复制全文
文本内容风险识别

算子介绍

前置条件

算子使用前置条件:开通业务风险识别产品-文本风险识别,产品链接见:https://www.volcengine.com/product/business-security

Daft 调用

算子参数

输入

输入列名

说明

text_col

输入文本列

account_id_col

(可选) 文本发送者的ID列

operate_time_col

(可选) 用户发送文本的秒级时间戳列

nick_name_col

(可选) 文本发送者的用户昵称列

signature_col

(可选) 用户的个性签名列

text_type_col

(可选) 文本内容的类型列 (例如 "prompt", "response")

session_id_col

(可选) AIGC对话场景下的对话轮次ID列

输出

一个结构体数组,包含风险识别结果

  • FinalLabel: 最终风险标签
  • Decision: 判定结果 (PASS, BLOCK, REVIEW)
  • Message: 服务端返回信息
  • risk_result: 完整的JSON格式检测结果

参数

如参数没有默认值,则为必填参数

参数名称

类型

默认值

描述

app_id

int

在业务风险识别产品中开通的应用ID

biztype

str

在业务风险识别产品中配置的场景

timeout

int

120

超时时间设置,单位(s) 设置从发送请求到接收到结果的超时设置,如果是离线任务,为了防止数据处理阻塞,可以适当的增加该值

调用示例

下面的代码展示了如何使用 Daft(适用于分布式)运行算子进行文本内容风险识别。

from __future__ import annotations

import os

import daft
from daft import col
from daft.las.functions.text import ContentRiskRec
from daft.las.functions.udf import las_udf

if __name__ == "__main__":

    """
    算子使用前置条件:开通业务风险识别产品-文本风险识别,产品链接见:https://www.volcengine.com/product/business-security
    app_id: 在业务风险识别产品中开通的应用ID
    biztype: 在业务风险识别产品中配置的场景
    本示例代码中使用模拟值
    """

    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import logging

        import ray

        def configure_logging():
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S.%s".format(),
            )
            logging.getLogger("tracing.span").setLevel(logging.WARNING)
            logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
            logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

        ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging})
        daft.context.set_runner_ray()
    daft.set_execution_config(actor_udf_ready_timeout=600)
    daft.set_execution_config(min_cpu_per_task=0)

    samples = {
        "id": [1, 2],
        "text": [
            "今天天气真好,我们一起去公园玩吧!",
            "出售枪支,联系电话123456789",
        ],
        "account_id": ["user1", "user2"],
        "nickname": ["用户1", "用户2"],
    }
    df = daft.from_pydict(samples)

    # 实际使用时需要替换为有效的app_id和biztype
    constructor_kwargs = {"app_id": 123456, "biztype": "test_biztype"}

    # 使用 Daft 进行分布式处理
    df = df.with_column(
        "parsed_result",
        las_udf(ContentRiskRec, construct_args=constructor_kwargs, concurrency=1)(
            col("text"), account_id_col=col("account_id"), nick_name_col=col("nickname")
        ),
    )
    df = df.with_column("FinalLabel", col("parsed_result").struct.get("FinalLabel"))
    df = df.with_column("Decision", col("parsed_result").struct.get("Decision"))

    df.select("text", "account_id", "nickname", "FinalLabel", "Decision").show()

    # ╭─────────────────────────────────────┬────────────┬──────────┬────────────┬──────────╮
    # │ text                                ┆ account_id ┆ nickname ┆ FinalLabel ┆ Decision │
    # │ ---                                 ┆ ---        ┆ ---      ┆ ---        ┆ ---      │
    # │ Utf8                                ┆ Utf8       ┆ Utf8     ┆ Utf8       ┆ Utf8     │
    # ╞═════════════════════════════════════╪════════════╪══════════╪════════════╪══════════╡
    # │ 今天天气真好,我们一起去公园玩吧!…       ┆ user1      ┆ 用户1    ┆            ┆ PASS     │
    # ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤
    # │ 出售枪支,联系电话123456789…           ┆ user2      ┆ 用户2     ┆ 106        ┆ BLOCK    │
    # ╰─────────────────────────────────────┴────────────┴──────────┴────────────┴──────────╯
最近更新时间:2026.01.08 19:14:23
这个页面对您有帮助吗?
有用
有用
无用
无用