算子ID:daft.las.functions.audio.audio_concat_fast.AudioConcatFast
音频快速拼接处理器(同源音频)
输入列名 | 说明 |
|---|---|
audio_paths_list | 包含音频文件路径列表的列,每个元素是一个字符串列表 |
output_col | 包含输出音频文件路径的数组 |
包含拼接后音频文件路径的数组,成功返回输出路径,失败返回None
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
output_format | str | mp3 | 输出音频格式(仅用于确定文件扩展名),默认为 "mp3" 注意:由于使用 concat demuxer,输出格式应与输入音频格式一致 |
timeout | int or None | None | ffmpeg执行超时时间(秒),默认为None(无超时) |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子快速拼接同源音频文件(无需重编码)。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.audio import AudioConcatFast from daft.las.functions.udf import las_udf if __name__ == "__main__": # 转换后的音频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_TEST_DIR_URL = os.getenv("TOS_TEST_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) samples = { "audio_paths": [ [ f"https://{TOS_TEST_DIR_URL}/public/archive/audio_concat_fast/sample_a.wav", f"https://{TOS_TEST_DIR_URL}/public/archive/audio_concat_fast/sample_b.wav", ], ], "output_path": [f"tos://{TOS_TEST_DIR}/audio_concat_fast/output/concatenated_audio_fast.wav"], } ds = daft.from_pydict(samples) # Using Daft to concatenate audio files (fast, no re-encoding) constructor_kwargs = { "output_format": "wav", # 与输入格式一致 } ds = ds.with_column( "output_path", las_udf( AudioConcatFast, construct_args=constructor_kwargs, num_cpus=1, concurrency=1, batch_size=1, )(col("audio_paths"), col("output_path")), ) ds.show() # ╭───────────────────┬───────────────────╮ # │ audio_paths ┆ output_path │ # │ --- ┆ --- │ # │ List[String] ┆ String │ # ╞═══════════════════╪═══════════════════╡ # │ [https://las-public-data-qa.t… ┆ tos://tos_bucket/audio_concat… │ # ╰───────────────────┴───────────────────╯