算子使用前置条件:开通业务风险识别产品-文本风险识别,产品链接见:https://www.volcengine.com/product/business-security
输入列名 | 说明 |
|---|---|
text_col | 输入文本列 |
account_id_col | (可选) 文本发送者的ID列 |
operate_time_col | (可选) 用户发送文本的秒级时间戳列 |
nick_name_col | (可选) 文本发送者的用户昵称列 |
signature_col | (可选) 用户的个性签名列 |
text_type_col | (可选) 文本内容的类型列 (例如 "prompt", "response") |
session_id_col | (可选) AIGC对话场景下的对话轮次ID列 |
一个结构体数组,包含风险识别结果
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
app_id | int | 在业务风险识别产品中开通的应用ID | |
biztype | str | 在业务风险识别产品中配置的场景 | |
timeout | int | 120 | 超时时间设置,单位(s) 设置从发送请求到接收到结果的超时设置,如果是离线任务,为了防止数据处理阻塞,可以适当的增加该值 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子进行文本内容风险识别。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.text import ContentRiskRec from daft.las.functions.udf import las_udf if __name__ == "__main__": """ 算子使用前置条件:开通业务风险识别产品-文本风险识别,产品链接见:https://www.volcengine.com/product/business-security app_id: 在业务风险识别产品中开通的应用ID biztype: 在业务风险识别产品中配置的场景 本示例代码中使用模拟值 """ if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) samples = { "id": [1, 2], "text": [ "今天天气真好,我们一起去公园玩吧!", "出售枪支,联系电话123456789", ], "account_id": ["user1", "user2"], "nickname": ["用户1", "用户2"], } df = daft.from_pydict(samples) # 实际使用时需要替换为有效的app_id和biztype constructor_kwargs = {"app_id": 123456, "biztype": "test_biztype"} # 使用 Daft 进行分布式处理 df = df.with_column( "parsed_result", las_udf(ContentRiskRec, construct_args=constructor_kwargs, concurrency=1)( col("text"), account_id_col=col("account_id"), nick_name_col=col("nickname") ), ) df = df.with_column("FinalLabel", col("parsed_result").struct.get("FinalLabel")) df = df.with_column("Decision", col("parsed_result").struct.get("Decision")) df.select("text", "account_id", "nickname", "FinalLabel", "Decision").show() # ╭─────────────────────────────────────┬────────────┬──────────┬────────────┬──────────╮ # │ text ┆ account_id ┆ nickname ┆ FinalLabel ┆ Decision │ # │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ # │ Utf8 ┆ Utf8 ┆ Utf8 ┆ Utf8 ┆ Utf8 │ # ╞═════════════════════════════════════╪════════════╪══════════╪════════════╪══════════╡ # │ 今天天气真好,我们一起去公园玩吧!… ┆ user1 ┆ 用户1 ┆ ┆ PASS │ # ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌┤ # │ 出售枪支,联系电话123456789… ┆ user2 ┆ 用户2 ┆ 106 ┆ BLOCK │ # ╰─────────────────────────────────────┴────────────┴──────────┴────────────┴──────────╯