AI 数据湖服务提供多模态数据预处理功能,通过对原始音频数据进行一系列深度处理,帮助用户从源自真实世界(in-the-wild)的海量音频数据中自动提取出高质量的语音-文本对(Speech-Text Pairs),将原始音频数据转化为符合多模态大模型训练标准的、高价值的结构化数据集。本文为您介绍语音-文本对构建方案的优势特性、工作原理,并指导您如何构建完整的音频处理任务。
AI 的前沿正在向通用化、多模态化的方向飞速演进。在音频领域,技术浪潮已经从单一的语音识别(ASR)或语音合成(TTS)任务,转向了通用语音大模型(Universal Speech Models)、高表现力 TTS(Expressive TTS)、对话式 AI 以及音乐和声音特效生成。这些模型无论是需要理解数百种语言和方言的语音基础模型(Foundation Models for Speech),还是旨在复刻人类情感与风格的**声音克隆(Voice Cloning)**应用,其能力都直接取决于背后训练数据的质量。然而,源自真实世界(in-the-wild)的海量音频数据,在成为可用的“模型养料”之前,必须克服三大严峻挑战:
“Garbage In, Garbage Out”的原则在模型训练中被无限放大。这些混杂、无序的“脏”数据,轻则导致模型性能不彰,合成语音机械化;重则会使模型学到错误的偏见,甚至在关键应用中彻底失效。语音-文本对(Speech-Text Pairs)构建作为音频数据预处理的关键环节,能够将这些混杂、无序的原始音频转化为干净、规整、带有精确文本标注的数据对,为后续的模型训练和应用奠定坚实基础。
OPENSPEECH_APPID 、OPENSPEECH_TOKEN和OPENSPEECH_ENDPOINT。语音-文本对构建解决方案采用多阶段流水线架构,包含音频标准化、音源分离、语言识别、说话人分离、音频切分、语音识别等核心环节。
阶段 | 功能描述 | 详解 | 算子列表 |
|---|---|---|---|
音频标准化 | 统一音频格式,标准化采样率、声道数和响度 | 将不同格式的音频文件统一标准化为一致的音频参数,采样率重采样至16kHz,声道转换为单声道,响度-20dBFS,为后续处理提供标准化输入 |
|
音源分离 | 从混合音频中分离出人声,去除背景音乐和噪声 | 使用htdemucs深度学习模型从混合音频中分离出纯净的人声信号,有效去除背景音乐、环境噪声和其他非人声干扰,为后续说话人分离和语音识别提供高质量的人声输入 |
|
语言识别 | 识别音频中的语言类型,为后续ASR分支提供依据 | 基于Whisper-large模型对人声识别音频语言,支持多种语言识别,为ASR阶段提升识别准确率 |
|
说话人分离 | 识别不同说话人的语音段落,生成时间戳信息 | 使用深度学习模型自动识别音频中的不同说话人,生成包含开始时间、结束时间和说话人标识的分段信息 |
|
音频切分 | 根据说话人分离结果切分音频片段 | 根据说话人分离的时间戳精确切分音频文件,为每个说话人生成独立的音频片段 |
|
语音识别 | 按语言分支进行语音转文字,生成最终文本内容 | 根据语言识别结果,选择合适的ASR算子进行语音转文字,对每个说话人切分后的音频片段进行精确的语音识别
|
|
阶段 | 算子名称 | 功能介绍 | 所属代码函数 |
|---|---|---|---|
1. 音频标准化 | AudioStandardization | 对输入的任意音频文件进行标准化处理,统一转换为 16kHz 采样率、单声道、-20dBFS 响度的格式。这是确保后续所有模型处理效果一致性的关键预处理步骤。 |
|
2. 音源分离 | AudioSourceSeparation | 采用深度学习模型(如 |
|
3. 语言识别 | AudioLidWhisper | 使用 Whisper 模型对音频内容进行语言识别(Language Identification, LID)。它能判断出音频中的主要语言(如中文 |
|
4. 说话人分离 | AudioSpeakerDiarization | 分析音频,以区分出不同的说话人,并为每个说话人的发言片段打上时间戳(开始时间、结束时间)和身份标签(如 |
|
4.5. 数据展开 | BuildTimestamps | 这是一个在代码中自定义的辅助算子。它的功能是将分离的 |
|
5. 音频切分 | AudioSplitByTimestamps | 根据上一步说话人分离得到的时间戳信息,将完整的音频文件精确地切分成多个独立的、属于不同说话人的音频小片段。 |
|
6. 语音识别 | PreSignUrlForTos | 这是一个工具类算子,主要用于中文ASR分支。它将火山对象存储(TOS)的音频文件路径转换为一个有时间限制的公开访问URL。这是因为豆包 ASR 服务需要通过公网URL来拉取音频文件。 |
|
AudioAsrDoubao | [中文ASR分支] 调用火山引擎豆包语音识别服务。它接收经过预签名的音频 URL,并返回高质量的中文识别文本结果。 |
| |
AudioAsrWhisper | [英文ASR分支] 使用 OpenAI 的 Whisper 模型(代码中为 |
|
经过完整的 Pipeline 处理后,原始的、可能包含多种声音的音频文件被转化成了一系列以(音频片段,文本)形式存在的、干净的数据对。
原始音频:本文使用由希尔贝壳开源的 AISHELL-4 数据集作为 demo 演示会议场景语音转文本。
示例:
"asr_text":"我觉得咱们选址可以选在七里河附近,这边挨着水,这样的话咱们的水上设施供水能力就比较强,不用想,相对来说"{"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":4.773,"end":6.241,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"有多大?"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":31.081,"end":43.973,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"但是但是你要考虑的话,就是像我们后勤部需要招大量的保安,他们可以都有当地的,咱们可以,这样的话可以给当地的居民带"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_03","start":16.754,"end":22.778,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"然后那边空地也比较多,咱们这样的话,空间比较大。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_02","start":26.153,"end":28.786,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"那边也存在欺负或者"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":7.017,"end":11.438,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"对,这有最起码一个这个,然后才知道里面去买什么设施。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":15.927,"end":21.007,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"咱们可以建这个水上游乐园,咱们可以是分成几个区化。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":12.248,"end":14.965,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"出现管道的费用要少一些。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":4.435,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"咱一会再说这个,刚才说,说到选址方面的问题。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":5.819,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"先说一下这个游乐场。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":9.143,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"想听一下各位对这个创业,这个未来创业有什么想法?"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":21.58,"end":25.63,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"一部分是基础基础于这个,就比如说"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":11.438,"end":14.932,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"咱们可以建,现在夏天了。"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":0.031,"end":11.573,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"我觉得咱们选址可以选在七里河附近,这边挨着水,这样的话咱们的水上设施供水能力就比较强,不用想,相对来说"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_00","start":0.031,"end":5.87,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"这游乐场多大?有没有首先一个定义,游乐场咱们要建大概多大?"} {"audio_path":"tos://xxx/xxx.wav","language_code":"zh","speaker":"SPEAKER_01","start":44.193,"end":45.475,"speaker_audio_url":"tos://xxx/xxx.wav","asr_text":"就这个机会。"}
开发机是由 AI 数据湖服务为算法开发者量身打造的专业开发环境,具备高效便捷的特性,开发者可借助其快速开启数据处理任务的编写、调试及运行流程。开发者在 LAS 开发机 上,使用 Daft 分布式计算框架编写一个完整的文本清洗管道。
您可以在本地或者其他设备上使用终端或者 VSCode 等 IDE 远程使用 SSH 直连的方式连接开发机,快速开始数据处理任务编写、调试和运行,操作指导详见远程连接开发机。
开发者在 LAS 开发机上,使用 Daft 框架编写完整的音频处理 Pipeline。该 Pipeline 逻辑清晰地串联了从音频标准化到最终语音识别的全部算子。
以下是一个完整的代码示例,展示了如何构建该 Pipeline:
import logging from pathlib import Path import torch import daft import os from daft import col, lit from daft.las.functions.audio.audio_standardization import AudioStandardization from daft.las.functions.audio.audio_source_separation import AudioSourceSeparation from daft.las.functions.audio.audio_speaker_diarization import AudioSpeakerDiarization from daft.las.functions.audio.audio_split_by_timestamps import AudioSplitByTimestamps from daft.las.functions.audio.audio_lid_whisper import AudioLidWhisper from daft.las.functions.audio.audio_asr_doubao import AudioAsrDoubao from daft.las.functions.audio.audio_asr_whisper import AudioAsrWhisper from daft.las.functions.text.pre_sign_url_for_tos import PreSignUrlForTos from daft.las.functions.udf import las_udf from daft.las.io.tos import TOSConfig from io import StringIO log_capture = StringIO() log_handler = logging.StreamHandler(log_capture) log_handler.setLevel(logging.ERROR) log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") log_handler.setFormatter(log_formatter) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) logger.addHandler(log_handler) logging.getLogger("daft").setLevel(logging.ERROR) logging.getLogger("pyannote").setLevel(logging.ERROR) logging.getLogger("speechbrain").setLevel(logging.ERROR) def stage_1_audio_standardization(df: daft.DataFrame) -> daft.DataFrame: """ 阶段1: 音频标准化 对音频进行标准化处理,统一采样率、声道数和响度。 使用AudioStandardization算子,目标16kHz单声道-20dBFS。 Args: df: 包含audio_path列的DataFrame Returns: 增加standardized_audio列的DataFrame """ logger.info("阶段1: 音频标准化") df = df.with_column( "standardized_audio", las_udf( AudioStandardization, construct_args={ "target_sr": 16000, "target_channels": 1, "target_dbfs": -20.0, "target_gain_range": [-3.0, 3.0], "num_coroutines": 2, }, num_gpus=0, batch_size=1, concurrency=1, )(col("audio_path")), ) df.collect() return df def stage_2_audio_source_separation(df: daft.DataFrame, model_path: str) -> daft.DataFrame: """ 阶段2: 音源分离 从混合音频中分离出人声,去除背景音乐和噪声。 使用AudioSourceSeparation算子和htdemucs模型进行GPU加速处理。 Args: df: 包含standardized_audio列的DataFrame Returns: 增加vocal_audio列的DataFrame """ logger.info("阶段2: 音源分离") df = df.with_column( "vocal_audio", las_udf( AudioSourceSeparation, construct_args={ "model_path": model_path, "chunk_batch_size": 4, "device": "cuda" if torch.cuda.is_available() else "cpu", "num_coroutines": 1, }, num_gpus=1 if torch.cuda.is_available() else 0, batch_size=1, concurrency=1, )(col("standardized_audio")), ) df.collect() return df def stage_3_audio_language_identification(df: daft.DataFrame, model_path: str) -> daft.DataFrame: """ 阶段3: 语言识别 使用Whisper模型识别音频中的语言类型。 为后续按语言分支的ASR处理提供依据。 Args: df: 包含vocal_audio列的DataFrame model_path: 模型路径 Returns: 增加language_code列和language_name列的DataFrame """ logger.info("阶段3: 语言识别") df = df.with_column( "language_result", las_udf( AudioLidWhisper, construct_args={ "model_path": model_path, "model_name": "iic/speech_whisper-large_lid_multilingual_pytorch", "model_version": "v2.0.4", "rank": 0, }, num_gpus=1, batch_size=1, concurrency=1, )(col("vocal_audio")), ) df = df.with_column("language_code", col("language_result").struct.get("language_code")) df = df.with_column("language_name", col("language_result").struct.get("language_code_full_name")) df.collect() return df def stage_4_audio_speaker_diarization(df: daft.DataFrame, model_path: str) -> daft.DataFrame: """ 阶段4: 说话人分离 使用深度学习模型进行多说话人语音分离。 识别不同说话人的语音段落和时间范围。 Args: df: 包含vocal_audio列的DataFrame model_path: 模型路径 Returns: 增加speaker_segments列的DataFrame """ logger.info("阶段4: 说话人分离") df = df.with_column( "speaker_segments", las_udf( AudioSpeakerDiarization, construct_args={ "model_path": model_path, "rank": 0, }, num_gpus=1 if torch.cuda.is_available() else 0, batch_size=1, concurrency=1, )(col("vocal_audio")), ) df.collect() return df def stage_4_5_expand_speaker_segments(df: daft.DataFrame) -> daft.DataFrame: """ 阶段4.5: 说话人分段数据展开 将说话人分段的结构化数据展开为单独的行。 使用explode操作和自定义BuildTimestamps算子生成时间戳格式。 Args: df: 包含speaker_segments列的DataFrame Returns: 展开后的DataFrame,增加speaker、start、end、timestamps列 """ import pyarrow as pa from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.types import Operator class BuildTimestamps(Operator): """自定义 Operator,将start和end合成 [[start, end]] 形式""" def transform(self, start: pa.Array, end: pa.Array) -> pa.Array: result = [] for s, e in zip(start.to_pylist(), end.to_pylist()): result.append([[s, e]]) return pa.array(result, type=self.__return_column_type__()) @staticmethod def __return_column_type__() -> pa.DataType: return pa.list_(pa.list_(pa.float64())) build_timestamps_udf = las_udf(BuildTimestamps) df = df.explode("speaker_segments") df = df.with_column("speaker", col("speaker_segments").struct.get("speaker")) df = df.with_column("start", col("speaker_segments").struct.get("start")) df = df.with_column("end", col("speaker_segments").struct.get("end")) df = df.where((col("end") - col("start")) >= 1.0) df = df.with_column("timestamps", build_timestamps_udf(col("start"), col("end"))) df.collect() return df def stage_5_audio_split_by_speaker(df: daft.DataFrame, output_dir: str) -> daft.DataFrame: """ 阶段5: 按说话人切分音频 根据说话人分离的时间戳精确切分音频文件。 使用AudioSplitByTimestamps算子进行精确切分。 Args: df: 包含audio_path和timestamps列的DataFrame output_dir: 输出目录 Returns: 增加speaker_audio_splits列的DataFrame """ logger.info("阶段4: 按说话人切分音频") df = df.with_column("dummy_tos_uri", lit(None)) df = df.with_column("audio_format", lit("wav")) df = df.with_column( "speaker_audio_splits", las_udf( AudioSplitByTimestamps, construct_args={ "output_tos_dir": output_dir, "output_segments_binary": True, "output_audio_format": True, }, num_gpus=0, batch_size=1, concurrency=1, )(col("timestamps"), col("dummy_tos_uri") ,col("vocal_audio"),col("audio_format")), ) df.collect() return df def stage_6_audio_asr_by_language(df: daft.DataFrame, model_path: str) -> daft.DataFrame: """ 阶段6: 基于语言分支的语音识别 根据语言识别结果选择合适的ASR算子进行语音转文字。 中文使用豆包ASR,英文使用Whisper ASR。 Args: df: 包含speaker_audio_splits和language_code列的DataFrame model_path: 模型路径 Returns: 增加asr_text列的DataFrame """ logger.info("阶段6: 基于语言分支的语音识别") df_chinese = df.where(col("language_code") == "zh") df_english = df.where(col("language_code") == "en") if len(df_chinese.collect()) > 0: logger.info(f"处理 {len(df_chinese.collect())} 条中文音频数据") df_chinese = df_chinese.with_column( "speaker_audio_url", col("speaker_audio_splits").struct.get("segments").list.get(0) ) df_chinese = df_chinese.with_column( "speaker_audio_presigned_url", las_udf( PreSignUrlForTos, construct_args={ "expire_seconds": 3600, }, )(col("speaker_audio_url")), ) logger.info("开始豆包ASR识别...") df_chinese = df_chinese.with_column( "asr_result_zh", las_udf( AudioAsrDoubao, construct_args={ "appid": os.getenv("OPENSPEECH_APPID"), "token": os.getenv("OPENSPEECH_TOKEN"), "uid": "speech_text_pairs_pipeline", "enable_punc": True, "enable_ddc": True, "enable_speaker_info": False, "enable_itn": True, "poll_interval": 15, "num_coroutines": 1, }, num_gpus=0, batch_size=4, concurrency=1, )(col("speaker_audio_presigned_url")), ) df_chinese = df_chinese.with_column( "asr_text", col("asr_result_zh").struct.get("asr_result_text") ) df_chinese = df_chinese.exclude("asr_result_zh", "speaker_audio_presigned_url") logger.info("豆包ASR识别完成") if len(df_english.collect()) > 0: logger.info(f"处理 {len(df_english.collect())} 条英文音频数据") df_english = df_english.with_column( "speaker_audio_url", col("speaker_audio_splits").struct.get("segments").list.get(0) ) df_english = df_english.with_column( "speaker_audio_binary", col("speaker_audio_splits").struct.get("binaries").list.get(0) ) logger.info("开始Whisper ASR识别...") df_english = df_english.with_column( "asr_result_en", las_udf( AudioAsrWhisper, construct_args={ "audio_src_type": "audio_binary", "model_path": model_path, "model_name": "openai/whisper-large-v3", "dtype": "bfloat16", "source_language": "english", "translate_to_english": False, "condition_on_prev_tokens": True, "compression_ratio_threshold": 1.35, "temperature": 0.5, "logprob_threshold": -1.0, "batch_size": 10, "rank": 0, }, num_gpus=1, batch_size=1, concurrency=1, )(col("speaker_audio_binary")), ) df_english = df_english.with_column( "asr_text", col("asr_result_en").struct.get("asr_result") ) df_english = df_english.exclude("asr_result_en", "speaker_audio_binary") logger.info("Whisper ASR识别完成") if len(df_chinese.collect()) > 0 and len(df_english.collect()) > 0: df = df_chinese.concat(df_english) elif len(df_chinese.collect()) > 0: df = df_chinese elif len(df_english.collect()) > 0: df = df_english else: df = df.with_column("asr_text", lit(None)) df.collect() return df def run_speech_text_pairs_pipeline(input_dir: str, output_dir: str) -> None: """主函数 - 按阶段执行语音-文本对构建解决方案demo""" try: input_s3_dir = input_dir.replace("tos://", "s3://", 1) tos_config = TOSConfig.from_env() IO_CONFIG = daft.io.IOConfig(s3=tos_config.to_s3_config()) audio_extensions = ["*.wav", "*.mp3", "*.flac", "*.m4a", "*.aac"] dfs = [] for ext in audio_extensions: try: df_ext = daft.from_glob_path(f"{input_s3_dir}/{ext}", io_config=IO_CONFIG) if len(df_ext.collect()) > 0: dfs.append(df_ext) except: continue if not dfs: logger.warning(f"未找到音频文件: {input_dir}") return df = dfs[0] for df_ext in dfs[1:]: df = df.concat(df_ext) df = df.with_column( "audio_path", col("path").str.replace("s3://", "tos://") ) df = df.select("audio_path") logger.info(f"找到音频文件: {len(df.collect())} 个") logger.info(f"初始数据: {len(df.collect())} 条记录") model_path = "/opt/las/models" # 阶段1:音频标准化 - 统一采样率、声道数和响度 df = stage_1_audio_standardization(df) # 阶段2:音源分离 - 从混合音频中提取人声 df = stage_2_audio_source_separation(df, model_path) # 阶段3:语言识别 - 识别音频中的语言类型 df = stage_3_audio_language_identification(df, model_path) # 阶段4:说话人分离 - 识别不同说话人的语音段落 df = stage_4_audio_speaker_diarization(df, model_path) # 阶段4.5:说话人分段数据展开 - 将结构化数据展开为单独的行 df = stage_4_5_expand_speaker_segments(df) # 阶段5:按说话人切分音频 - 根据说话人分离结果切分音频 df = stage_5_audio_split_by_speaker(df, output_dir) # 阶段6:语音识别 - 按语言分支进行语音转文字 df = stage_6_audio_asr_by_language(df, model_path) df = df.exclude("standardized_audio","vocal_audio","language_result","language_name","speaker_segments","dummy_tos_uri","timestamps","audio_format","speaker_audio_splits") df.show() logger.info("Pipeline 阶段1-6执行完成!") except Exception as e: logger.error(f"Pipeline执行失败: {e}") raise if __name__ == "__main__": input_dir = "tos://tos_bucket/speech_text_pairs/test_audio" output_dir = "tos://tos_bucket/speech_text_pairs/audio_split_by_timestamps/" run_speech_text_pairs_pipeline(input_dir, output_dir)
以下是处理结果的示例表格:
当处理 Pipeline 在开发机上用样本数据验证无误后,将其固化为自定义镜像。在 LAS 任务管理中创建并配置一个批量任务,该任务将使用此镜像,自动、高效地处理大规模的原始音频数据,完成最终高质量语音-文本对的构建。
通过本解决方案,您可以构建一个高效、可靠的大语言模型预训练数据清洗pipeline,为模型训练提供高质量的语料支持。如需了解更多技术细节或定制化需求,请联系技术支持团队。