You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
音频安全识别
音频风险识别
复制全文
音频风险识别

算子介绍

前置条件

算子使用前置条件:开通业务风险识别产品-音频风险识别-音频点播服务,产品链接见:https://www.volcengine.com/product/business-security

Daft 调用

算子参数

输入

输入列名

说明

audio_id_col

输入音频唯一标志列

audio_url_col

音频内容所在的列,支持可直接访问的 url 地址

audio_title_col

(可选) 音频标题列

account_id_col

(可选) 音频发送者的ID列

operate_time_col

(可选) 用户发送音频的秒级时间戳列

输出

一个结构体,包含风险识别结果

  • Decision: 判定结果 (PASS, BLOCK, REVIEW)
  • Message: 服务端返回信息
  • risk_result: 完整的 JSON 格式检测结果

参数

如参数没有默认值,则为必填参数

参数名称

类型

默认值

描述

app_id

int

在业务风险识别产品中开通的应用ID

biztype

str

在业务风险识别产品中配置的音频风险识别场景

result_type

int

0

切片内容类型 0 表示仅返回违规的音频切片 1 表示返回所有切片检测结果 可选值: [0, 1]

timeout

int

120

超时时间设置,单位(s) 设置从发送请求到接收到结果的超时设置,如果是离线任务,为了防止数据处理阻塞,可以适当的增加该值

poll_interval

int

10

轮询间隔设置,单位(s) 调用主动查询接口时,设置的轮询间隔,请根据主动查询接口QPS以及性能需求来设置

num_coroutines

int

5

调用示例

下面的代码展示了如何使用 Daft(适用于分布式)运行算子进行音频内容风险识别。

from __future__ import annotations

import os

import daft
from daft import col
from daft.las.functions.audio import AudioRiskRec
from daft.las.functions.udf import las_udf

if __name__ == "__main__":

    """
    算子使用前置条件:开通业务风险识别产品-音频风险识别-音频点播服务,产品链接见:https://www.volcengine.com/product/business-security
    app_id: 在业务风险识别产品中开通的应用ID
    biztype: 在业务风险识别产品中配置的音频风险识别场景
    本示例代码中使用模拟值
    """
    
    # 注意:AudioRiskRec需要app_id和biztype参数,这里使用示例值
    
    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import logging

        import ray

        def configure_logging():
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S.%s".format(),
            )
            logging.getLogger("tracing.span").setLevel(logging.WARNING)
            logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
            logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

        ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging})
        daft.context.set_runner_ray()
    daft.set_execution_config(actor_udf_ready_timeout=600)
    daft.set_execution_config(min_cpu_per_task=0)

    # 使用环境变量构建URL
    tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com")
    samples = {
        "audio_id": ["1"],
        "audio_path": [
            f"https://{tos_dir_url}/public/shared_audio_dataset/sample.mp3"
        ],
    }
    df = daft.from_pydict(samples)

    # 实际使用时需要替换为有效的app_id和biztype
    constructor_kwargs = {"app_id": 123456, "biztype": "test_biztype"}

    # 使用 Daft 进行分布式处理
    df = df.with_column(
        "parsed_result",
        las_udf(AudioRiskRec, construct_args=constructor_kwargs, concurrency=1)(col("audio_id"), col("audio_path")),
    )
    df = df.with_column("Decision", col("parsed_result").struct.get("Decision"))
    df = df.with_column("Message", col("parsed_result").struct.get("Message"))
    df = df.with_column("risk_result", col("parsed_result").struct.get("risk_result"))

    df.select("audio_id", "audio_path", "Decision", "Message").show()
    # ╭──────────┬────────────────────────────────┬──────────┬─────────╮
    # │ audio_id ┆ audio_path                     ┆ Decision ┆ Message │
    # │ ---      ┆ ---                            ┆ ---      ┆ ---     │
    # │ Utf8     ┆ Utf8                           ┆ Utf8     ┆ Utf8    │
    # ╞══════════╪════════════════════════════════╪══════════╪═════════╡
    # │ 1        ┆ https://las-cn-beijing-publi-… ┆ PASS     ┆ success │
    # ╰──────────┴────────────────────────────────┴──────────┴─────────╯
最近更新时间:2026.01.08 19:15:08
这个页面对您有帮助吗?
有用
有用
无用
无用