You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
音频处理
音频拼接
复制全文
音频拼接

算子ID:daft.las.functions.audio.audio_concat.AudioConcat

算子介绍

描述

音频拼接处理器,支持将多个音频文件拼接成一段音频。

核心功能

  • 支持多渠道输入:本地文件路径、TOS/S3存储路径、HTTP/HTTPS路径
  • 支持输出到指定路径
  • 自动处理不同采样率和声道的音频;自动根据输出格式选择合适的编码器

格式支持

  • WAV (.wav) - 使用 pcm_s16le 编码器
  • MP3 (.mp3) - 使用 libmp3lame 编码器
  • FLAC (.flac) - 使用 flac 编码器

Daft 调用

算子参数

输入

输入列名

说明

audio_paths_list

包含音频文件路径列表的列,每个元素是一个字符串列表

output_col

包含输出音频文件路径的数组

输出

包含拼接后音频文件路径的数组,成功返回输出路径,失败返回None

参数

注意

如参数没有默认值,则为必填参数。

参数名称

类型

默认值

描述

output_format

str

mp3

输出音频格式,仅支持 "wav", "mp3", "flac",默认为 "mp3"

sample_rate

int

16000

输出音频采样率,默认为 16000 注意:由于使用 concat filter 进行拼接时必须重编码,所以采样率是必选参数 常用采样率:8000, 16000, 22050, 44100, 48000

timeout

int or None

None

ffmpeg执行超时时间(秒),默认为None(无超时)

extra_params

list or None

None

额外的ffmpeg参数列表,直接拼接到命令中 例如: - 比特率: ["-b:a", "192k"] # 适用于 MP3 - 压缩级别: ["-compression_level", "8"] # 适用于 FLAC 默认值:None

调用示例

下面的代码展示了如何使用 Daft(适用于分布式)运行算子拼接多个音频文件。

from __future__ import annotations

import os

import daft
from daft import col
from daft.las.functions.audio import AudioConcat
from daft.las.functions.udf import las_udf

if __name__ == "__main__":
    # 转换后的音频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR
    TOS_TEST_DIR_URL = os.getenv("TOS_TEST_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com")
    TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket")

    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import logging

        import ray

        def configure_logging():
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S",
            )
            logging.getLogger("tracing.span").setLevel(logging.WARNING)
            logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
            logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

        ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging})
        daft.context.set_runner_ray()

    daft.set_execution_config(actor_udf_ready_timeout=600)
    daft.set_execution_config(min_cpu_per_task=0)

    samples = {
        "audio_paths": [
            [
                f"https://{TOS_TEST_DIR_URL}/public/archive/audio_concat/sample_a.mp3",
                f"https://{TOS_TEST_DIR_URL}/public/archive/audio_concat/sample_b.wav",
            ],
        ],
        "output_path": [f"tos://{TOS_TEST_DIR}/audio_concat/output/concatenated_audio.mp3"],
    }
    ds = daft.from_pydict(samples)

    # Using Daft to concatenate audio files
    constructor_kwargs = {
        "output_format": "mp3",
        "sample_rate": 16000,
        "extra_params": ["-b:a", "192k"],
    }

    ds = ds.with_column(
        "output_path",
        las_udf(
            AudioConcat,
            construct_args=constructor_kwargs,
            num_cpus=1,
            concurrency=1,
            batch_size=1,
        )(col("audio_paths"), col("output_path")),
    )

    ds.show()
    # ╭───────────────────┬────────────────────╮
    # │ audio_paths                    ┆ output_path                    │
    # │ ---                            ┆ ---                            │
    # │ List[String]                   ┆ String                         │
    # ╞═══════════════════╪════════════════════╡
    # │ [https://las-public-data-qa.t… ┆ tos://tos_bucket/audio_concat… │
    # ╰───────────────────┴──────────────────═─╯
最近更新时间:2026.01.08 19:15:08
这个页面对您有帮助吗?
有用
有用
无用
无用