音频滤镜处理器,基于 FFmpeg 的灵活音频效果应用。
参考文档: https://ffmpeg.org/ffmpeg-filters.html#Audio-Filters
输入列名 | 说明 |
|---|---|
audio_paths | 包含输入音频路径的数组 默认值:None |
audio_binaries | 包含音频二进制数据的数组 默认值:None |
audio_formats | 包含输入音频格式(如 'mp3'、'wav' 等)的数组,指定 audio_binaries 时可以提供格式信息 默认值:None |
output_basenames | 可选,输出文件的基础名数组(不含扩展名),用于自定义输出文件名 默认值:None |
处理后的结构体字段包括:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
filter_name | str | FFmpeg 音频滤镜名称。 常见滤镜: - volume: 音量调整 - highpass: 高通滤波(去除低频噪声) - lowpass: 低通滤波(去除高频噪声) - bass/lowshelf: 低音增强 - treble/highshelf: 高音增强 - aecho: 回声效果 更多滤镜详见官方文档。 | |
filter_kwargs | dict or None | 滤镜参数字典(因滤镜而异)。 示例: - volume: {"volume": 1.5} - highpass: {"f": 300, "width_type": "h", "width": 0.5} - lowpass: {"f": 3000, "width_type": "h", "width": 0.5} | |
global_args | list or None | FFmpeg 全局参数列表。 常用组合: - 静默模式(仅错误): ["-loglevel", "error", "-hide_banner"] - 调试模式: ["-loglevel", "debug", "-stats"] - 强制覆盖输出: ["-y"] - 性能优化: ["-threads", "4"] | |
output_tos_dir | str | 输出结果的 TOS 目录(为空则不上传)。 | |
output_audio_binary | bool | False | 是否返回处理后的音频二进制。 默认值:False |
output_audio_format | str or None | 指定输出音频格式(如 "mp3"、"wav" 等)。 若为空则沿用输入文件后缀;对于二进制输入则默认 "wav"。 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子对音频应用 FFmpeg 滤镜。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.audio.audio_ffmpeg_wrapped import AudioFFMPEGWrapped from daft.las.functions.udf import las_udf if __name__ == "__main__": # 更改完处理的音频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_dir = f"tos://{TOS_DIR}/audio/audio_ffmpeg_wrapped" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "audio_path": [ f"https://{tos_dir_url}/public/shared_audio_dataset/sample.mp3" ] } ds = daft.from_pydict(samples) splitter = las_udf( AudioFFMPEGWrapped, construct_args={ "filter_name": "volume", "filter_kwargs": {"volume": 1.5}, "global_args": ["-loglevel", "error", "-hide_banner", "-y"], "output_tos_dir": output_tos_dir, "output_audio_binary": False, "output_audio_format": "wav", }, ) ds = ds.with_column("results", splitter(col("audio_path"))) ds.show() # ╭─────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ # │ audio_path ┆ results │ # │ --- ┆ --- │ # │ Utf8 ┆ Struct[processed_audio_path: Utf8, processed_audio_binary: Binary] │ # ╞═════════════════════════════════════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ {processed_audio_path: "tos://tos_bucket/audio/audio_ffmpeg_wrapped/sample_processed.wav", processed_audio_binary: null} │ # ╰─────────────────────────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯