音频格式转换处理器
输入列名 | 说明 |
|---|---|
input_col | 包含输入音频路径的数组(支持本地路径、HTTP/HTTPS URL、TOS/S3 URL) |
output_col | 包含输出音频文件路径的数组 |
包含转换结果路径的数组,成功返回输出路径,失败返回None
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
output_format | str | None | 输出音频格式,仅支持 "wav", "mp3", "flac",默认为 "wav" |
timeout | int or None | None | ffmpeg 执行超时时间(秒),默认为 None(无超时) |
extra_params | list or None | None | 额外的 ffmpeg 参数列表,直接拼接到命令中 例如: - 音轨选择: ["-map", "0:a"] # 选择所有音轨 - 采样率: ["-ar", "44100"] - 比特率: ["-b:a", "192k"] # 适用于 MP3 - 压缩级别: ["-compression_level", "8"] # 适用于 FLAC |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子对音频进行格式转换。支持转换为MP3、WAV、FLAC、AAC、OGG等多种格式。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.audio import AudioConvert from daft.las.functions.udf import las_udf if __name__ == "__main__": # 转换后的音频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_TEST_DIR_URL = os.getenv("TOS_TEST_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) samples = { "input_path": [f"https://{TOS_TEST_DIR_URL}/public/archive/audio_convert/sample.wav"], "output_path": [f"tos://{TOS_TEST_DIR}/audio_convert/sample_converted.mp3"], } ds = daft.from_pydict(samples) # Using Daft to convert audio to MP3 format (as an example) # AudioConvert supports multiple formats: mp3, wav, flac, aac, ogg, etc. constructor_kwargs = { "output_format": "mp3", "sample_rate": 44100, "audio_map": "auto", "extra_params": ["-c:a", "libmp3lame", "-b:a", "192k", "-q:a", "2"], } ds = ds.with_column( "convert_result", las_udf( AudioConvert, construct_args=constructor_kwargs, num_cpus=1, concurrency=1, batch_size=1, )(col("input_path"), col("output_path")), ) ds.show() # ╭────────────────────────────────┬────────────────────────────────┬────────────────────────────────╮ # │ input_path ┆ output_path ┆ convert_result │ # │ --- ┆ --- ┆ --- │ # │ String ┆ String ┆ String │ # ╞════════════════════════════════╪════════════════════════════════╪════════════════════════════════╡ # │ https://las-public-data-qa.to… ┆ tos://tos_bucket/audio_conver… ┆ tos://tos_bucket/audio_conver… │ # ╰────────────────────────────────┴────────────────────────────────┴────────────────────────────────╯