You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
音频处理
音频快速拼接(同源)
复制全文
音频快速拼接(同源)

算子ID:daft.las.functions.audio.audio_concat_fast.AudioConcatFast

算子介绍

描述

音频快速拼接处理器(同源音频)

  • 使用 concat demuxer 快速拼接同源音频(无需重编码)
  • 适用于格式、编码、采样率完全相同的音频文件
  • 拼接同一录音分段
  • 拼接相同格式的音频片段
  • 需要快速拼接且无需格式转换

核心功能

  • 速度快,无质量损失
  • 支持多输入格式:本地文件路径、TOS/S3存储路径、HTTP/HTTPS路径
  • 支持输出到指定路径

格式支持与要求

  • 支持常见音频格式:mp3, wav, flac, aac, m4a等
  • 要求:所有输入音频必须具有相同的格式、编码、采样率、声道数

与 音频拼接(AudioConcat)的区别

  • AudioConcat: 使用 concat filter,支持不同格式音频,需要重编码
  • AudioConcatFast: 使用 concat demuxer,仅支持同源音频,无需重编码,速度更快

Daft 调用

算子参数

输入

输入列名

说明

audio_paths_list

包含音频文件路径列表的列,每个元素是一个字符串列表

output_col

包含输出音频文件路径的数组

输出

包含拼接后音频文件路径的数组,成功返回输出路径,失败返回None

参数

如参数没有默认值,则为必填参数

参数名称

类型

默认值

描述

output_format

str

mp3

输出音频格式(仅用于确定文件扩展名),默认为 "mp3" 注意:由于使用 concat demuxer,输出格式应与输入音频格式一致

timeout

int or None

None

ffmpeg执行超时时间(秒),默认为None(无超时)

调用示例

下面的代码展示了如何使用 Daft(适用于分布式)运行算子快速拼接同源音频文件(无需重编码)。

from __future__ import annotations

import os

import daft
from daft import col
from daft.las.functions.audio import AudioConcatFast
from daft.las.functions.udf import las_udf

if __name__ == "__main__":
    # 转换后的音频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR
    TOS_TEST_DIR_URL = os.getenv("TOS_TEST_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com")
    TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket")

    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import logging

        import ray

        def configure_logging():
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S",
            )
            logging.getLogger("tracing.span").setLevel(logging.WARNING)
            logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
            logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

        ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging})
        daft.context.set_runner_ray()

    daft.set_execution_config(actor_udf_ready_timeout=600)
    daft.set_execution_config(min_cpu_per_task=0)

    samples = {
        "audio_paths": [
            [
                f"https://{TOS_TEST_DIR_URL}/public/archive/audio_concat_fast/sample_a.wav",
                f"https://{TOS_TEST_DIR_URL}/public/archive/audio_concat_fast/sample_b.wav",
            ],
        ],
        "output_path": [f"tos://{TOS_TEST_DIR}/audio_concat_fast/output/concatenated_audio_fast.wav"],
    }
    ds = daft.from_pydict(samples)

    # Using Daft to concatenate audio files (fast, no re-encoding)
    constructor_kwargs = {
        "output_format": "wav",  # 与输入格式一致
    }

    ds = ds.with_column(
        "output_path",
        las_udf(
            AudioConcatFast,
            construct_args=constructor_kwargs,
            num_cpus=1,
            concurrency=1,
            batch_size=1,
        )(col("audio_paths"), col("output_path")),
    )

    ds.show()
    # ╭───────────────────┬───────────────────╮
    # │ audio_paths                    ┆ output_path                    │
    # │ ---                            ┆ ---                            │
    # │ List[String]                   ┆ String                         │
    # ╞═══════════════════╪═══════════════════╡
    # │ [https://las-public-data-qa.t… ┆ tos://tos_bucket/audio_concat… │
    # ╰───────────────────┴───────────────────╯
最近更新时间:2026.01.08 19:15:08
这个页面对您有帮助吗?
有用
有用
无用
无用