You need to enable JavaScript to run this app.
导航
高质量语音-文本对构建
最近更新时间:2025.09.22 11:13:41首次发布时间:2025.08.20 11:19:14
复制全文
我的收藏
有用
有用
无用
无用

AI 数据湖服务提供多模态数据预处理功能,通过对原始音频数据进行一系列深度处理,帮助用户从源自真实世界(in-the-wild)的海量音频数据中自动提取出高质量的语音-文本对(Speech-Text Pairs),将原始音频数据转化为符合多模态大模型训练标准的、高价值的结构化数据集。本文为您介绍语音-文本对构建方案的优势特性、工作原理,并指导您如何构建完整的音频处理任务。

背景信息

AI 的前沿正在向通用化、多模态化的方向飞速演进。在音频领域,技术浪潮已经从单一的语音识别(ASR)或语音合成(TTS)任务,转向了通用语音大模型(Universal Speech Models)、高表现力 TTS(Expressive TTS)、对话式 AI 以及音乐和声音特效生成。这些模型无论是需要理解数百种语言和方言的语音基础模型(Foundation Models for Speech),还是旨在复刻人类情感与风格的**声音克隆(Voice Cloning)**应用,其能力都直接取决于背后训练数据的质量。然而,源自真实世界(in-the-wild)的海量音频数据,在成为可用的“模型养料”之前,必须克服三大严峻挑战:

  • 环境复杂:音频中可能夹杂着包含主讲人声、背景人声、环境噪声、背景音乐等多种声学事件。对于先进模型而言,挑战不再仅仅是“降噪”,而是要实现对声源的精确“解耦(Disentanglement)”,在完整保留目标人声的韵律和情感细节的同时,有效剥离或理解其他干扰元素
  • 内容混杂:在多人对话、会议或访谈音频中,不同说话人的声音相互重叠、交织。对于需要进行个性化语音合成声音复刻的模型来说,训练数据中哪怕混入几秒钟其他人的声音,都可能导致最终合成的声线“串味”、发音模糊。因此,保证每个训练片段的**“说话人纯净度”(Speaker Purity)**,并进行精确到毫秒级的时间戳切分,已成为不可或缺的前提。
  • 格式各异:不同来源的音频文件在格式、采样率、声道数和响度上千差万别。

“Garbage In, Garbage Out”的原则在模型训练中被无限放大。这些混杂、无序的“脏”数据,轻则导致模型性能不彰,合成语音机械化;重则会使模型学到错误的偏见,甚至在关键应用中彻底失效。语音-文本对(Speech-Text Pairs)构建作为音频数据预处理的关键环节,能够将这些混杂、无序的原始音频转化为干净、规整、带有精确文本标注的数据对,为后续的模型训练和应用奠定坚实基础。

应用场景

  • 多模态大模型训练数据准备:适用于需要构建大规模、高质量语音-文本对训练语料的场景,为语音合成(TTS)、语音识别(ASR)、音视频理解等多模态大模型提供优质的训练数据。
  • 语音内容理解与转录:适用于需要从海量音频中批量提取文本内容的场景,例如播客转录、会议纪要生成、视频字幕制作、在线课程内容提取等。
  • 说话人分析与内容整理:适用于需要识别和分离不同说话人语音的场景,例如多人访谈内容整理、客服对话质检、影视剧本对白提取等。

优势特性

  • 全流程覆盖:支持从原始音频到最终语音-文本对的端到端处理,覆盖音频标准化、人声分离、语言识别、说话人分离、音频切分、语音识别(ASR)等所有关键阶段。
  • 多模态处理能力:深度集成超过 6 种核心音频处理算子,对音频数据进行全方位、多维度的处理与保障。
  • 智能语言识别:基于 Whisper 模型的高精度音频语言识别能力(Audio LID),为后续针对不同语种配置独立的 ASR 处理链路提供精准依据。
  • 高效说话人分离:采用先进的说话人分离(Diarization)算法,能精确识别并切分不同说话人的语音段落,有效处理多人对话场景。
  • 双语言 ASR 优化:内置针对中文和英文音频的独立优化 ASR 链路,中文使用性能卓越的豆包 ASR,英文使用 Whisper ASR,确保各类语料的最佳识别效果。
  • 分布式处理能力:依托 Daft 框架的并行与分布式调度机制,可实现 TB 级音频数据的高性能处理与转换。
  • 灵活算子组合:所有音频处理步骤均基于 LAS 的可组合算子实现,支持灵活调整、替换、扩展处理模块,并可通过用户自定义函数(UDF)进行功能增强。
  • GPU 加速支持:关键的计算密集型算子(如音源分离、ASR)支持 GPU 加速,显著提升大规模音频数据的处理效率。

前提条件

  • 已注册火山引擎账号。若为首次登录,请先完成实名认证。详细操作请参考:账号注册流程
  • 已开通AI 数据湖服务服务。
  • 已创建通用队列计算队列。详细操作请参考:队列管理
  • 已创建开发机,详细操作请参考:创建开发机。并配置好相关的环境变量,特别是豆包 ASR 服务所需的 OPENSPEECH_APPIDOPENSPEECH_TOKENOPENSPEECH_ENDPOINT

工作原理

语音-文本对构建解决方案采用多阶段流水线架构,包含音频标准化、音源分离、语言识别、说话人分离、音频切分、语音识别等核心环节。
Image

核心处理流程

阶段

功能描述

详解

算子列表

音频标准化

统一音频格式,标准化采样率、声道数和响度

将不同格式的音频文件统一标准化为一致的音频参数,采样率重采样至16kHz,声道转换为单声道,响度-20dBFS,为后续处理提供标准化输入

  • AudioStandardization

音源分离

从混合音频中分离出人声,去除背景音乐和噪声

使用htdemucs深度学习模型从混合音频中分离出纯净的人声信号,有效去除背景音乐、环境噪声和其他非人声干扰,为后续说话人分离和语音识别提供高质量的人声输入

  • AudioSourceSeparation

语言识别

识别音频中的语言类型,为后续ASR分支提供依据

基于Whisper-large模型对人声识别音频语言,支持多种语言识别,为ASR阶段提升识别准确率

  • AudioLidWhisper

说话人分离

识别不同说话人的语音段落,生成时间戳信息

使用深度学习模型自动识别音频中的不同说话人,生成包含开始时间、结束时间和说话人标识的分段信息

  • AudioSpeakerDiarization

音频切分

根据说话人分离结果切分音频片段

根据说话人分离的时间戳精确切分音频文件,为每个说话人生成独立的音频片段

  • AudioSplitByTimestamps

语音识别

按语言分支进行语音转文字,生成最终文本内容

根据语言识别结果,选择合适的ASR算子进行语音转文字,对每个说话人切分后的音频片段进行精确的语音识别

  • 中文音频使用豆包ASR,
  • 英文音频使用Whisper ASR
  • AudioAsrDoubao
  • AudioAsrWhisper

算子列表

阶段

算子名称

功能介绍

所属代码函数

1. 音频标准化

AudioStandardization

对输入的任意音频文件进行标准化处理,统一转换为 16kHz 采样率、单声道、-20dBFS 响度的格式。这是确保后续所有模型处理效果一致性的关键预处理步骤。

  • stage_1_audio_standardization

2. 音源分离

AudioSourceSeparation

采用深度学习模型(如 htdemucs)将音频中的人声与背景音(音乐、噪声等)分离开。该算子输出纯净的人声音频,极大地提升了后续语音识别和说话人分离的准确性。

  • stage_2_audio_source_separation

3. 语言识别

AudioLidWhisper

使用 Whisper 模型对音频内容进行语言识别(Language Identification, LID)。它能判断出音频中的主要语言(如中文 zh、英文 en),为后续选择最合适的 ASR (语音识别) 模型提供决策依据。

  • stage_3_audio_language_identification

4. 说话人分离

AudioSpeakerDiarization

分析音频,以区分出不同的说话人,并为每个说话人的发言片段打上时间戳(开始时间、结束时间)和身份标签(如 SPEAKER_00, SPEAKER_01)。这个过程也称为“说话人日志”。

  • stage_4_audio_speaker_diarization

4.5. 数据展开

BuildTimestamps

这是一个在代码中自定义的辅助算子。它的功能是将分离的 startend 时间戳列,合并成后续 AudioSplitByTimestamps 算子所要求的特定嵌套列表格式 [[start, end]]

  • stage_4_5_expand_speaker_segments

5. 音频切分

AudioSplitByTimestamps

根据上一步说话人分离得到的时间戳信息,将完整的音频文件精确地切分成多个独立的、属于不同说话人的音频小片段。

  • stage_5_audio_split_by_speaker

6. 语音识别

PreSignUrlForTos

这是一个工具类算子,主要用于中文ASR分支。它将火山对象存储(TOS)的音频文件路径转换为一个有时间限制的公开访问URL。这是因为豆包 ASR 服务需要通过公网URL来拉取音频文件。

  • stage_6_audio_asr_by_language

AudioAsrDoubao

[中文ASR分支] 调用火山引擎豆包语音识别服务。它接收经过预签名的音频 URL,并返回高质量的中文识别文本结果。

  • stage_6_audio_asr_by_language

AudioAsrWhisper

[英文ASR分支] 使用 OpenAI 的 Whisper 模型(代码中为 whisper-large-v3)进行语音识别。它直接处理切分后的音频二进制数据,将其转换为英文文本。

  • stage_6_audio_asr_by_language

案例展示

经过完整的 Pipeline 处理后,原始的、可能包含多种声音的音频文件被转化成了一系列以(音频片段,文本)形式存在的、干净的数据对。
原始音频:本文使用由希尔贝壳开源的 AISHELL-4 数据集作为 demo 演示会议场景语音转文本。

示例:

000000016.wav
未知大小

处理结果:您可以从中选取文件并将其存入 TOS 桶,完成本文配置即可得到 demo 结果。如示例文件对应 line 13 处 :"asr_text":"我觉得咱们选址可以选在七里河附近,这边挨着水,这样的话咱们的水上设施供水能力就比较强,不用想,相对来说"

  • 构建结果:
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":4.773,"end":6.241,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"有多大?"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":31.081,"end":43.973,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"但是但是你要考虑的话,就是像我们后勤部需要招大量的保安,他们可以都有当地的,咱们可以,这样的话可以给当地的居民带"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_03","start":16.754,"end":22.778,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"然后那边空地也比较多,咱们这样的话,空间比较大。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_02","start":26.153,"end":28.786,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"那边也存在欺负或者"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":7.017,"end":11.438,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"对,这有最起码一个这个,然后才知道里面去买什么设施。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":15.927,"end":21.007,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"咱们可以建这个水上游乐园,咱们可以是分成几个区化。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":12.248,"end":14.965,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"出现管道的费用要少一些。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":4.435,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"咱一会再说这个,刚才说,说到选址方面的问题。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":5.819,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"先说一下这个游乐场。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":9.143,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"想听一下各位对这个创业,这个未来创业有什么想法?"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":21.58,"end":25.63,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"一部分是基础基础于这个,就比如说"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":11.438,"end":14.932,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"咱们可以建,现在夏天了。"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":0.031,"end":11.573,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"我觉得咱们选址可以选在七里河附近,这边挨着水,这样的话咱们的水上设施供水能力就比较强,不用想,相对来说"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":5.87,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"这游乐场多大?有没有首先一个定义,游乐场咱们要建大概多大?"}
{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":44.193,"end":45.475,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"就这个机会。"}

开发调试

开发机是由 AI 数据湖服务为算法开发者量身打造的专业开发环境,具备高效便捷的特性,开发者可借助其快速开启数据处理任务的编写、调试及运行流程。开发者在 LAS 开发机 上,使用 Daft 分布式计算框架编写一个完整的文本清洗管道。
您可以在本地或者其他设备上使用终端或者 VSCode 等 IDE 远程使用 SSH 直连的方式连接开发机,快速开始数据处理任务编写、调试和运行,操作指导详见远程连接开发机

代码参考

开发者在 LAS 开发机上,使用 Daft 框架编写完整的音频处理 Pipeline。该 Pipeline 逻辑清晰地串联了从音频标准化到最终语音识别的全部算子。
以下是一个完整的代码示例,展示了如何构建该 Pipeline:

import logging
from pathlib import Path

import torch
import daft
import os
from daft import col, lit

from daft.las.functions.audio.audio_standardization import AudioStandardization
from daft.las.functions.audio.audio_source_separation import AudioSourceSeparation
from daft.las.functions.audio.audio_speaker_diarization import AudioSpeakerDiarization
from daft.las.functions.audio.audio_split_by_timestamps import AudioSplitByTimestamps
from daft.las.functions.audio.audio_lid_whisper import AudioLidWhisper
from daft.las.functions.audio.audio_asr_doubao import AudioAsrDoubao
from daft.las.functions.audio.audio_asr_whisper import AudioAsrWhisper
from daft.las.functions.text.pre_sign_url_for_tos import PreSignUrlForTos
from daft.las.functions.udf import las_udf
from daft.las.io.tos import TOSConfig
from io import StringIO

log_capture = StringIO()
log_handler = logging.StreamHandler(log_capture)
log_handler.setLevel(logging.ERROR)
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
log_handler.setFormatter(log_formatter)

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
logger.addHandler(log_handler)

logging.getLogger("daft").setLevel(logging.ERROR)
logging.getLogger("pyannote").setLevel(logging.ERROR)
logging.getLogger("speechbrain").setLevel(logging.ERROR)

def stage_1_audio_standardization(df: daft.DataFrame) -> daft.DataFrame:
    """
    阶段1: 音频标准化
    
    对音频进行标准化处理,统一采样率、声道数和响度。
    使用AudioStandardization算子,目标16kHz单声道-20dBFS。
    
    Args:
        df: 包含audio_path列的DataFrame
        
    Returns:
        增加standardized_audio列的DataFrame
    """
    logger.info("阶段1: 音频标准化")
    
    df = df.with_column(
        "standardized_audio",
        las_udf(
            AudioStandardization,
            construct_args={
                "target_sr": 16000,
                "target_channels": 1,
                "target_dbfs": -20.0,
                "target_gain_range": [-3.0, 3.0],
                "num_coroutines": 2,
            },
            num_gpus=0,
            batch_size=1,
            concurrency=1,
        )(col("audio_path")),
    )
    df.collect()

    return df

def stage_2_audio_source_separation(df: daft.DataFrame, model_path: str) -> daft.DataFrame:
    """
    阶段2: 音源分离
    
    从混合音频中分离出人声,去除背景音乐和噪声。
    使用AudioSourceSeparation算子和htdemucs模型进行GPU加速处理。
    
    Args:
        df: 包含standardized_audio列的DataFrame
        
    Returns:
        增加vocal_audio列的DataFrame
    """
    logger.info("阶段2: 音源分离")
    
    df = df.with_column(
        "vocal_audio",
        las_udf(
            AudioSourceSeparation,
            construct_args={
                "model_path": model_path,
                "chunk_batch_size": 4,
                "device": "cuda" if torch.cuda.is_available() else "cpu",
                "num_coroutines": 1,
            },
            num_gpus=1 if torch.cuda.is_available() else 0,
            batch_size=1,
            concurrency=1,
        )(col("standardized_audio")),
    )
    df.collect()

    return df

def stage_3_audio_language_identification(df: daft.DataFrame, model_path: str) -> daft.DataFrame:
    """
    阶段3: 语言识别
    
    使用Whisper模型识别音频中的语言类型。
    为后续按语言分支的ASR处理提供依据。
    
    Args:
        df: 包含vocal_audio列的DataFrame
        model_path: 模型路径
        
    Returns:
        增加language_code列和language_name列的DataFrame
    """
    logger.info("阶段3: 语言识别")
    
    df = df.with_column(
        "language_result",
        las_udf(
            AudioLidWhisper,
            construct_args={
                "model_path": model_path,
                "model_name": "iic/speech_whisper-large_lid_multilingual_pytorch",
                "model_version": "v2.0.4",
                "rank": 0,
            },
            num_gpus=1,
            batch_size=1,
            concurrency=1,
        )(col("vocal_audio")),
    )
    
    df = df.with_column("language_code", col("language_result").struct.get("language_code"))
    df = df.with_column("language_name", col("language_result").struct.get("language_code_full_name"))
    df.collect()

    return df

def stage_4_audio_speaker_diarization(df: daft.DataFrame, model_path: str) -> daft.DataFrame:
    """
    阶段4: 说话人分离
    
    使用深度学习模型进行多说话人语音分离。
    识别不同说话人的语音段落和时间范围。
    
    Args:
        df: 包含vocal_audio列的DataFrame
        model_path: 模型路径
        
    Returns:
        增加speaker_segments列的DataFrame
    """
    logger.info("阶段4: 说话人分离")
    
    df = df.with_column(
        "speaker_segments",
        las_udf(
            AudioSpeakerDiarization,
            construct_args={
                "model_path": model_path,
                "rank": 0,
            },
            num_gpus=1 if torch.cuda.is_available() else 0,
            batch_size=1,
            concurrency=1,
        )(col("vocal_audio")),
    )
    df.collect()
    
    return df

def stage_4_5_expand_speaker_segments(df: daft.DataFrame) -> daft.DataFrame:
    """
    阶段4.5: 说话人分段数据展开
    
    将说话人分段的结构化数据展开为单独的行。
    使用explode操作和自定义BuildTimestamps算子生成时间戳格式。
    
    Args:
        df: 包含speaker_segments列的DataFrame
        
    Returns:
        展开后的DataFrame,增加speaker、start、end、timestamps列
    """
    import pyarrow as pa
    from daft import col
    from daft.las.functions.udf import las_udf
    from daft.las.functions.types import Operator

    class BuildTimestamps(Operator):
        """自定义 Operator,将start和end合成 [[start, end]] 形式"""

        def transform(self, start: pa.Array, end: pa.Array) -> pa.Array:
            result = []
            for s, e in zip(start.to_pylist(), end.to_pylist()):
                result.append([[s, e]])
            return pa.array(result, type=self.__return_column_type__())

        @staticmethod
        def __return_column_type__() -> pa.DataType:
            return pa.list_(pa.list_(pa.float64()))

    build_timestamps_udf = las_udf(BuildTimestamps)

    df = df.explode("speaker_segments")
    df = df.with_column("speaker", col("speaker_segments").struct.get("speaker"))
    df = df.with_column("start", col("speaker_segments").struct.get("start"))
    df = df.with_column("end", col("speaker_segments").struct.get("end"))

    df = df.where((col("end") - col("start")) >= 1.0)

    df = df.with_column("timestamps", build_timestamps_udf(col("start"), col("end")))
    df.collect()

    return df

def stage_5_audio_split_by_speaker(df: daft.DataFrame, output_dir: str) -> daft.DataFrame:
    """
    阶段5: 按说话人切分音频
    
    根据说话人分离的时间戳精确切分音频文件。
    使用AudioSplitByTimestamps算子进行精确切分。
    
    Args:
        df: 包含audio_path和timestamps列的DataFrame
        output_dir: 输出目录
        
    Returns:
        增加speaker_audio_splits列的DataFrame
    """
    logger.info("阶段4: 按说话人切分音频")

    df = df.with_column("dummy_tos_uri", lit(None))
    df = df.with_column("audio_format", lit("wav"))
    
    df = df.with_column(
        "speaker_audio_splits",
        las_udf(
            AudioSplitByTimestamps,
            construct_args={
                "output_tos_dir": output_dir,
                "output_segments_binary": True,
                "output_audio_format": True,
            },
            num_gpus=0,
            batch_size=1,
            concurrency=1,
        )(col("timestamps"), col("dummy_tos_uri") ,col("vocal_audio"),col("audio_format")),
    )
    df.collect()

    return df

def stage_6_audio_asr_by_language(df: daft.DataFrame, model_path: str) -> daft.DataFrame:
    """
    阶段6: 基于语言分支的语音识别
    
    根据语言识别结果选择合适的ASR算子进行语音转文字。
    中文使用豆包ASR,英文使用Whisper ASR。
    
    Args:
        df: 包含speaker_audio_splits和language_code列的DataFrame
        model_path: 模型路径
        
    Returns:
        增加asr_text列的DataFrame
    """
    logger.info("阶段6: 基于语言分支的语音识别")
    
    df_chinese = df.where(col("language_code") == "zh")
    df_english = df.where(col("language_code") == "en")
    
    if len(df_chinese.collect()) > 0:
        logger.info(f"处理 {len(df_chinese.collect())} 条中文音频数据")
        
        df_chinese = df_chinese.with_column(
            "speaker_audio_url",
            col("speaker_audio_splits").struct.get("segments").list.get(0)
        )
        
        df_chinese = df_chinese.with_column(
            "speaker_audio_presigned_url",
            las_udf(
                PreSignUrlForTos,
                construct_args={
                    "expire_seconds": 3600,
                },
            )(col("speaker_audio_url")),
        )
        
        logger.info("开始豆包ASR识别...")
        df_chinese = df_chinese.with_column(
            "asr_result_zh",
            las_udf(
                AudioAsrDoubao,
                construct_args={
                    "appid": os.getenv("OPENSPEECH_APPID"),
                    "token": os.getenv("OPENSPEECH_TOKEN"),
                    "uid": "speech_text_pairs_pipeline",
                    "enable_punc": True,
                    "enable_ddc": True,
                    "enable_speaker_info": False,
                    "enable_itn": True,
                    "poll_interval": 15,
                    "num_coroutines": 1,
                },
                num_gpus=0,
                batch_size=4,
                concurrency=1,
            )(col("speaker_audio_presigned_url")),
        )
        
        df_chinese = df_chinese.with_column(
            "asr_text", 
            col("asr_result_zh").struct.get("asr_result_text")
        )
        df_chinese = df_chinese.exclude("asr_result_zh", "speaker_audio_presigned_url")
        logger.info("豆包ASR识别完成")
    
    if len(df_english.collect()) > 0:
        logger.info(f"处理 {len(df_english.collect())} 条英文音频数据")
        
        df_english = df_english.with_column(
            "speaker_audio_url",
            col("speaker_audio_splits").struct.get("segments").list.get(0)
        )
        
        df_english = df_english.with_column(
            "speaker_audio_binary",
            col("speaker_audio_splits").struct.get("binaries").list.get(0)
        )
        
        logger.info("开始Whisper ASR识别...")
        df_english = df_english.with_column(
            "asr_result_en",
            las_udf(
                AudioAsrWhisper,
                construct_args={
                    "audio_src_type": "audio_binary",
                    "model_path": model_path,
                    "model_name": "openai/whisper-large-v3",
                    "dtype": "bfloat16",
                    "source_language": "english",
                    "translate_to_english": False,
                    "condition_on_prev_tokens": True,
                    "compression_ratio_threshold": 1.35,
                    "temperature": 0.5,
                    "logprob_threshold": -1.0,
                    "batch_size": 10,
                    "rank": 0,
                },
                num_gpus=1,
                batch_size=1,
                concurrency=1,
            )(col("speaker_audio_binary")),
        )
        
        df_english = df_english.with_column(
            "asr_text", 
            col("asr_result_en").struct.get("asr_result")
        )
        df_english = df_english.exclude("asr_result_en", "speaker_audio_binary")
        logger.info("Whisper ASR识别完成")
    
    if len(df_chinese.collect()) > 0 and len(df_english.collect()) > 0:
        df = df_chinese.concat(df_english)
    elif len(df_chinese.collect()) > 0:
        df = df_chinese
    elif len(df_english.collect()) > 0:
        df = df_english
    else:
        df = df.with_column("asr_text", lit(None))
    
    df.collect()
    return df

def run_speech_text_pairs_pipeline(input_dir: str, output_dir: str) -> None:
    """主函数 - 按阶段执行语音-文本对构建解决方案demo"""
    
    try:
        input_s3_dir = input_dir.replace("tos://", "s3://", 1)
        tos_config = TOSConfig.from_env()
        IO_CONFIG = daft.io.IOConfig(s3=tos_config.to_s3_config())
        
        audio_extensions = ["*.wav", "*.mp3", "*.flac", "*.m4a", "*.aac"]
        dfs = []
        
        for ext in audio_extensions:
            try:
                df_ext = daft.from_glob_path(f"{input_s3_dir}/{ext}", io_config=IO_CONFIG)
                if len(df_ext.collect()) > 0:
                    dfs.append(df_ext)
            except:
                continue
        
        if not dfs:
            logger.warning(f"未找到音频文件: {input_dir}")
            return
        
        df = dfs[0]
        for df_ext in dfs[1:]:
            df = df.concat(df_ext)
        
        df = df.with_column(
            "audio_path",
            col("path").str.replace("s3://", "tos://")
        )
        df = df.select("audio_path")
        
        logger.info(f"找到音频文件: {len(df.collect())} 个")
        logger.info(f"初始数据: {len(df.collect())} 条记录")
        
        model_path = "/opt/las/models"
        
        # 阶段1:音频标准化 - 统一采样率、声道数和响度
        df = stage_1_audio_standardization(df)
        
        # 阶段2:音源分离 - 从混合音频中提取人声
        df = stage_2_audio_source_separation(df, model_path)
        
        # 阶段3:语言识别 - 识别音频中的语言类型
        df = stage_3_audio_language_identification(df, model_path)
        
        # 阶段4:说话人分离 - 识别不同说话人的语音段落
        df = stage_4_audio_speaker_diarization(df, model_path)
        
        # 阶段4.5:说话人分段数据展开 - 将结构化数据展开为单独的行
        df = stage_4_5_expand_speaker_segments(df)
        
        # 阶段5:按说话人切分音频 - 根据说话人分离结果切分音频
        df = stage_5_audio_split_by_speaker(df, output_dir)
        
        # 阶段6:语音识别 - 按语言分支进行语音转文字
        df = stage_6_audio_asr_by_language(df, model_path)
        df = df.exclude("standardized_audio","vocal_audio","language_result","language_name","speaker_segments","dummy_tos_uri","timestamps","audio_format","speaker_audio_splits")
        df.show()
        
        logger.info("Pipeline 阶段1-6执行完成!")
        
    except Exception as e:
        logger.error(f"Pipeline执行失败: {e}")
        raise

if __name__ == "__main__":
    input_dir = "tos://tos_bucket/speech_text_pairs/test_audio"
    output_dir = "tos://tos_bucket/speech_text_pairs/audio_split_by_timestamps/"
    
    run_speech_text_pairs_pipeline(input_dir, output_dir)

处理结果

以下是处理结果的示例表格:

全量音频处理

当处理 Pipeline 在开发机上用样本数据验证无误后,将其固化为自定义镜像。在 LAS 任务管理中创建并配置一个批量任务,该任务将使用此镜像,自动、高效地处理大规模的原始音频数据,完成最终高质量语音-文本对的构建。
通过本解决方案,您可以构建一个高效、可靠的大语言模型预训练数据清洗pipeline,为模型训练提供高质量的语料支持。如需了解更多技术细节或定制化需求,请联系技术支持团队。