视频音频抽取处理器,支持多流分离
输入列名 | 说明 |
|---|---|
video_paths | 视频文件路径数组(本地、TOS、HTTP等) |
video_binaries | 视频二进制数据数组(可选) |
video_formats | 视频格式字符串数组(可选) |
output_basenames | 可选,输出子目录名(文件名)数组 |
结构体数组,包含:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
output_tos_dir | str | 将从视频中抽取出的音频保存到该 TOS 目录中,如果为空,则不保存音频 | |
output_audio_binary | bool | False | 是否返回音频的二进制内容,默认为 False |
output_audio_array | bool | False | 是否返回音频的 numpy array 格式,默认为 False |
stream_indexes | list or None | None | 指定抽取哪些音频流,默认全部。超出范围的索引会被自动忽略 |
return_first_stream | bool | True | 如果为 True,则只返回第一个音频流,否则返回所有流的列表,默认为 True |
start_second | float or None | None | 从视频的第几秒开始抽取音频,None 表示从头开始,单位为秒 |
end_second | float or None | None | 到视频的第几秒结束抽取音频,None 表示到结尾,单位为秒 |
output_format | str | mp3 | 输出音频格式,默认 "mp3" |
output_sample_rate | int | 48000 | 输出音频采样率,默认 48000 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子对视频进行音频抽取。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video import VideoExtractAudio if __name__ == "__main__": # 提取音频后会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限访问和写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR_URL TOS_TEST_DIR_URL = os.getenv("TOS_TEST_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) samples = { "video_path": [f"https://{TOS_TEST_DIR_URL}/video_extract_audio/sample.mp4"], } ds = daft.from_pydict(samples) # Using Daft to extract audio from video output_tos_dir = f"tos://{TOS_TEST_DIR}/video_extract_audio" constructor_kwargs = { "output_tos_dir": output_tos_dir, "output_audio_binary": True, "output_sampling_rate": 16000, "output_audio_format": "mp3", } ds = ds.with_column( "extract_results", las_udf( VideoExtractAudio, construct_args=constructor_kwargs, num_cpus=1, concurrency=1, batch_size=1, )(col("video_path")), ) ds.show() # ╭────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ # │ video_path ┆ extract_results │ # │ --- ┆ --- │ # │ Utf8 ┆ Struct[audio_paths: List[Utf8], audio_arrays: List[List[Float32]], binaries: List[Binary], original_audio_sampling_rates: List[Float64]] │ # ╞════════════════════════════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡ # │ tos://tos_bucket/video_extrac… ┆ {audio_paths: [tos://las-ai-c… │ # ╰────────────────────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯