音频时间戳切分处理器,支持精准片段提取。
输入列名 | 说明 |
|---|---|
timestamp_ranges | 必需,包含切分用的时间戳区间的列 |
audio_paths | 可选,包含音频文件路径的列 |
audio_binaries | 可选,包含音频二进制数据的列 |
audio_formats | 可选,包含音频格式字符串的列,指定 audio_binaries 时必须指定该列 |
output_basenames | 可选,包含指定文件名的列 |
一个结构体,包含音频切分结果:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
output_tos_dir | str | 切分后的音频片段保存到 TOS 的路径。 | |
output_segments_binary | bool | False | 是否返回切分后音频片段的二进制数据。 |
output_format | str or None | 全局指定输出音频格式(如"mp3"、"wav"等),优先级高于输入文件后缀和 audio_format 列。 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子对音频按时间戳切分。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.audio import AudioSplitByTimestamps from daft.las.functions.udf import las_udf if __name__ == "__main__": # 更改完切分的音频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_dir = f"tos://{TOS_DIR}/audio/audio_split_by_timestamps" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "audio_path": [ f"https://{tos_dir_url}/public/shared_audio_dataset/sample.mp3" ], "timestamps": [[(0.0, 5.0), (5.0, 10.0), (10.0, 15.0)]], } ds = daft.from_pydict(samples) constructor_kwargs = { "output_tos_dir": output_tos_dir, "output_segments_binary": True, "output_audio_format": True, } # 使用 Daft 进行分布式处理 ds = ds.with_column( "split_results", las_udf(AudioSplitByTimestamps, construct_args=constructor_kwargs)(col("timestamps"), col("audio_path")), ) ds.show() # ╭────────────────────────────────┬─────────────────────────────┬───────────────────────────────────────────────────────────────────────────╮ # │ audio_path ┆ timestamps ┆ split_results │ # │ --- ┆ --- ┆ --- │ # │ Utf8 ┆ List[List[Float64]] ┆ Struct[segments: List[Utf8], binaries: List[Binary], formats: List[Utf8]] │ # ╞════════════════════════════════╪═════════════════════════════╪═══════════════════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ [[0, 5], [5, 10], [10, 15]] ┆ {segments: ["tos://tos_bucket/audio/audio_split_by_timestamps/segment_1.… │ # ╰────────────────────────────────┴─────────────────────────────┴───────────────────────────────────────────────────────────────────────────╯