算子使用前置条件:开通业务风险识别产品-音频风险识别-音频点播服务,产品链接见:https://www.volcengine.com/product/business-security
输入列名 | 说明 |
|---|---|
audio_id_col | 输入音频唯一标志列 |
audio_url_col | 音频内容所在的列,支持可直接访问的 url 地址 |
audio_title_col | (可选) 音频标题列 |
account_id_col | (可选) 音频发送者的ID列 |
operate_time_col | (可选) 用户发送音频的秒级时间戳列 |
一个结构体,包含风险识别结果
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
app_id | int | 在业务风险识别产品中开通的应用ID | |
biztype | str | 在业务风险识别产品中配置的音频风险识别场景 | |
result_type | int | 0 | 切片内容类型 0 表示仅返回违规的音频切片 1 表示返回所有切片检测结果 可选值: [0, 1] |
timeout | int | 120 | 超时时间设置,单位(s) 设置从发送请求到接收到结果的超时设置,如果是离线任务,为了防止数据处理阻塞,可以适当的增加该值 |
poll_interval | int | 10 | 轮询间隔设置,单位(s) 调用主动查询接口时,设置的轮询间隔,请根据主动查询接口QPS以及性能需求来设置 |
num_coroutines | int | 5 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子进行音频内容风险识别。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.audio import AudioRiskRec from daft.las.functions.udf import las_udf if __name__ == "__main__": """ 算子使用前置条件:开通业务风险识别产品-音频风险识别-音频点播服务,产品链接见:https://www.volcengine.com/product/business-security app_id: 在业务风险识别产品中开通的应用ID biztype: 在业务风险识别产品中配置的音频风险识别场景 本示例代码中使用模拟值 """ # 注意:AudioRiskRec需要app_id和biztype参数,这里使用示例值 if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "audio_id": ["1"], "audio_path": [ f"https://{tos_dir_url}/public/shared_audio_dataset/sample.mp3" ], } df = daft.from_pydict(samples) # 实际使用时需要替换为有效的app_id和biztype constructor_kwargs = {"app_id": 123456, "biztype": "test_biztype"} # 使用 Daft 进行分布式处理 df = df.with_column( "parsed_result", las_udf(AudioRiskRec, construct_args=constructor_kwargs, concurrency=1)(col("audio_id"), col("audio_path")), ) df = df.with_column("Decision", col("parsed_result").struct.get("Decision")) df = df.with_column("Message", col("parsed_result").struct.get("Message")) df = df.with_column("risk_result", col("parsed_result").struct.get("risk_result")) df.select("audio_id", "audio_path", "Decision", "Message").show() # ╭──────────┬────────────────────────────────┬──────────┬─────────╮ # │ audio_id ┆ audio_path ┆ Decision ┆ Message │ # │ --- ┆ --- ┆ --- ┆ --- │ # │ Utf8 ┆ Utf8 ┆ Utf8 ┆ Utf8 │ # ╞══════════╪════════════════════════════════╪══════════╪═════════╡ # │ 1 ┆ https://las-cn-beijing-publi-… ┆ PASS ┆ success │ # ╰──────────┴────────────────────────────────┴──────────┴─────────╯