AI 数据湖服务提供先进的多模态数据处理能力。其视频字幕同步性检查解决方案,基于 Daft 分布式计算框架和 LAS 多模态处理算子,通过一个多阶段、智能化的视频分析 pipeline,帮助用户自动检测视频字幕与语音内容的同步性。该方案能够将原始视频数据,转化为带有精确同步性分析结果的结构化数据集,为视频内容的质量评估与优化提供可靠依据。
在视频成为信息传播主流媒介的今天,字幕的质量直接影响着观众的体验、信息获取的准确性以及内容的可访问性(Accessibility)。一个常见的质量问题是字幕与语音的“音画不同步”,即字幕的出现和消失时间与对应的语音时间轴存在延迟或提前。
传统上,这类问题的检测依赖于人工审核。然而,面对海量的视频数据,人工审核不仅效率低下、成本高昂,而且容易因主观判断和疲劳而出错。特别是在在线教育、专业培训、影视剧集和新闻播报等对信息准确性要求极高的场景中,自动化的同步性检测能力显得至关重要。因此,构建一个能够精准、高效、批量化分析视频字幕与语音同步性的自动化解决方案,已成为提升视频内容质量和生产效率的关键技术环节。
OPENSPEECH_APPID 、OPENSPEECH_TOKEN和OPENSPEECH_ENDPOINT。
本解决方案采用多阶段流水线架构,通过多模态算子的协同工作,实现对视频字幕同步性的精确检测。
阶段 | 功能描述 | 详解 | 算子列表 |
|---|---|---|---|
音频提取 | 从视频文件中提取音频流。 | 从视频中分离音频轨道,并将其标准化为适合后续 ASR 处理的格式(如 16kHz mp3)。 | VideoExtractAudio |
语音识别 | 对提取的音频进行语音识别和说话人分离。 | 基于豆包 ASR 服务进行高精度语音识别,输出包含每个语音片段的起止时间、说话人 ID 和文本内容的结构化数据。 | AudioAsrDoubao |
ASR 结果解析 | 将 ASR 的原始输出解析为结构化的字幕列表。 | 通过自定义算子,将 ASR 返回的文本解析成一个包含说话人、开始时间、结束时间和文本内容的 JSON 数组。 | 自定义 AsrTextParser |
提示词生成 | 为视觉语言模型(VLM)生成分析任务的提示词。 | 将结构化的 ASR 结果嵌入到一个预设的 Prompt 模板中,指示 VLM 对视频进行字幕同步性分析。 | 自定义 VlmPromptGenerator |
视频理解与同步性分析 | 使用 VLM 对视频进行多模态分析,检测同步性。 | VLM 接收视频文件和生成的提示词,通过分析视频帧内容,判断画面中的字幕出现时机是否与 ASR 提供的语音时间戳一致,并输出结构化的 JSON 判断结果。 | ArkLLMThinkingVision |
原始视频:本文使用由 MME-VideoOCR 开源的数据集中部分挑选后的视频(CC-BY-4.0)
处理结果:您可以从中选取文件并将其存入 TOS 桶,完成本文配置即可得到 demo 结果。
详情参考本文后续的处理结果部分。
开发机是由 AI 数据湖服务为算法开发者量身打造的专业开发环境,具备高效便捷的特性,开发者可借助其快速开启数据处理任务的编写、调试及运行流程。
您可以在本地或者其他设备上使用终端或者 VSCode 等 IDE 远程使用 SSH 直连的方式连接开发机,快速开始数据处理任务编写、调试和运行,操作指导详见远程连接开发机。
以下是完整的 Pipeline 代码,包括数据加载、各阶段处理算子的调用以及自定义的解析与提示词生成逻辑。
from __future__ import annotations import logging import os import re from datetime import datetime from typing import List, Dict, Any import daft from daft import col from daft.las.functions.video.video_extract_audio import VideoExtractAudio from daft.las.functions.audio.audio_asr_doubao import AudioAsrDoubao from daft.las.functions.text.pre_sign_url_for_tos import PreSignUrlForTos from daft.las.functions.ark_llm.ark_llm_thinking_vision import ArkLLMThinkingVision from daft.las.functions.udf import las_udf from daft.las.io.tos import TOSConfig from daft.las.functions.types import Operator from daft.dependencies import pa logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) class AsrTextParser(Operator): def parse_asr_segments(self, asr_text_simple: str) -> List[Dict[str, Any]]: if not asr_text_simple or asr_text_simple.strip() == "None": return [] segments = [] speaker_blocks = asr_text_simple.strip().split('\n\n') for block in speaker_blocks: lines = block.strip().split('\n') if len(lines) < 2: continue header_line = lines[0] text_content = '\n'.join(lines[1:]).strip() pattern = r'说话人\s+(\d+)\s+(\d+:\d+:\d+)\s+(\d+:\d+:\d+)' match = re.match(pattern, header_line) if match: speaker_id = match.group(1) start_time = match.group(2) end_time = match.group(3) segments.append({ "speaker": f"说话人 {speaker_id}", "start_time": start_time, "end_time": end_time, "text": text_content }) return segments def transform(self, asr_texts: pa.Array) -> pa.Array: results = [] for asr_text in asr_texts.to_pylist(): parsed_segments = self.parse_asr_segments(asr_text) results.append(parsed_segments) return pa.array(results, type=self.__return_column_type__()) @staticmethod def __return_column_type__() -> pa.DataType: return pa.list_(pa.struct([ pa.field("speaker", pa.string()), pa.field("start_time", pa.string()), pa.field("end_time", pa.string()), pa.field("text", pa.string()) ])) class VlmPromptGenerator(Operator): def time_str_to_seconds(self, time_str: str) -> float: parts = time_str.split(':') hours = int(parts[0]) minutes = int(parts[1]) seconds = int(parts[2]) return hours * 3600 + minutes * 60 + seconds def generate_vlm_prompt(self, asr_segments: List[Dict[str, Any]]) -> str: subtitles = [] timestamps = [] for segment in asr_segments: subtitles.append(segment['text']) start_seconds = self.time_str_to_seconds(segment['start_time']) end_seconds = self.time_str_to_seconds(segment['end_time']) timestamps.append([start_seconds, end_seconds]) original_prompt = """你是一名视频字幕核对助手,负责核对视频声音和字幕是否同步。具体任务是结合ASR提取的一一对应的视频字幕以及时间戳列表,对比分析视频画面中的字幕时间是否和结构化视频字幕的开始时间和结束时间同步,并记录分析结果。 如下ASR 字幕示例,字幕 "很高兴你能过来" 开始时间是 4.8 秒,结束的时间是 7.46 秒。 <ASR 字幕示例> [……,"很高兴你能过来",……] </ASR 字幕示例> <ASR 字幕时间戳示例> [……,[4.8,7.46],……] </ASR 字幕时间戳示例> 首先,请仔细阅读以下ASR提取的视频字幕列表、字幕起始和终止时间戳列表: <ASR_list_subtitles> {{ASR_LIST_SUBTITLES}} </ASR_list_subtitles> <ASR_list_timestamps> {{ASR_LIST_TIMESTAMPS}} </ASR_list_timestamps> 接下来,请仔细分析视频画面中的字幕,按照以下步骤进行: 1. 逐一对视频画面中的字幕与结构化的语音字幕逐一进行匹配。 2. 对比每一条匹配字幕的开始时间和结束时间。 3. 若开始时间和结束时间一致或在可接受的误差范围内(如±0.5秒),则判定为同步;否则判定为不同步。 4. 记录每一条字幕的核对结果,如果不同步,在结果 "notes" 字段说明视频画面字幕的延迟出现,还是提前出现,并记录具体的时间,。 请结合结构化的视频字幕和时间戳列表仔细分析视频,然后逐条给出视频字幕和语音字幕匹配检测的结果,结果格式为 JSON,不做额外解释。 其中,start_time为语音字幕开始时间,end_time为语音字幕结束时间,text为语音字幕,vlm为检索结果,如下为输出示例: [……, { "start_time": 4.8, "end_time": 7.46, "text": "很高兴你能过来", "vlm": {"是否同步":"<是/否>", "notes":"<视频画面字幕的延迟出现/提前出现> X 秒"} }, …… ]""" prompt = original_prompt.replace("{{ASR_LIST_SUBTITLES}}", str(subtitles)) prompt = prompt.replace("{{ASR_LIST_TIMESTAMPS}}", str(timestamps)) return prompt def transform(self, asr_segments_list: pa.Array) -> pa.Array: results = [] for asr_segments in asr_segments_list.to_pylist(): prompt = self.generate_vlm_prompt(asr_segments) results.append(prompt) return pa.array(results, type=self.__return_column_type__()) @staticmethod def __return_column_type__() -> pa.DataType: return pa.string() def run_pipeline(input_tos_dir: str, audio_output_tos_dir: str): input_s3_dir = input_tos_dir.replace("tos://", "s3://", 1) tos_config = TOSConfig.from_env() IO_CONFIG = daft.io.IOConfig(s3=tos_config.to_s3_config()) logger.info("视频字幕同步检查pipeline开始运行...") df = daft.from_glob_path( f"{input_s3_dir}/*.mp4", io_config=IO_CONFIG, ) df = df.with_column( "video_tos_path", col("path").str.replace("s3://", "tos://") ) df = df.with_column( "video_id", col("path").str.split("/").list.get(-1).str.replace(".mp4", "") ) df = df.select("video_tos_path", "video_id") logger.info("视频文件加载完成: %d 条记录", df.count_rows()) logger.info("开始视频音频抽取...") df = df.with_column( "audio_extraction_result", las_udf( VideoExtractAudio, construct_args={ "output_tos_dir": audio_output_tos_dir, "output_audio_binary": True, "output_audio_array": False, "return_first_stream": True, "output_format": "mp3", "output_sample_rate": 16000, }, num_gpus=0, batch_size=2, concurrency=1, )(col("video_tos_path")), ) df.collect() logger.info("视频音频抽取完成: %d 条记录", df.count_rows()) df = df.with_columns({ "audio_tos_path": col("audio_extraction_result").struct.get("audio_paths").list.get(0), "audio_binary": col("audio_extraction_result").struct.get("binaries").list.get(0), "original_sample_rate": col("audio_extraction_result").struct.get("original_audio_sampling_rates").list.get(0) }) logger.info("开始音频URL预签名...") df = df.with_column( "audio_presigned_url", las_udf( PreSignUrlForTos, construct_args={"expire_seconds": 3600}, )(col("audio_tos_path")), ) df.collect() logger.info("音频URL预签名完成: %d 条记录", df.count_rows()) logger.info("开始音频ASR识别...") appid = os.getenv("OPENSPEECH_APPID") token = os.getenv("OPENSPEECH_TOKEN") if not appid or not token: logger.error("缺少OPENSPEECH_APPID或OPENSPEECH_TOKEN环境变量") return df = df.with_column( "asr_result", las_udf( AudioAsrDoubao, construct_args={ "appid": appid, "token": token, "uid": "video_subtitle_sync", "enable_speaker_info": True, "enable_punc": True, "enable_ddc": True, "poll_interval": 10, "num_coroutines": 1, }, num_gpus=0, batch_size=1, concurrency=1, )(col("audio_presigned_url")), ) df.collect() logger.info("音频ASR识别完成: %d 条记录", df.count_rows()) df = df.with_columns({ "asr_text_simple": col("asr_result").struct.get("asr_result_simple"), "asr_text_raw": col("asr_result").struct.get("asr_result_raw"), "asr_text_content": col("asr_result").struct.get("asr_result_text") }) logger.info("开始ASR文本结构化解析...") df = df.with_column( "asr_segments_structured", las_udf(AsrTextParser)(col("asr_text_simple")), ) df.collect() logger.info("ASR文本结构化解析完成: %d 条记录", df.count_rows()) logger.info("开始生成VLM提示词...") df = df.with_column( "vlm_prompt", las_udf(VlmPromptGenerator)(col("asr_segments_structured")), ) df.collect() logger.info("VLM提示词生成完成: %d 条记录", df.count_rows()) logger.info("开始视频字幕同步分析...") df = df.with_column( "vlm_analysis_struct", las_udf( ArkLLMThinkingVision, construct_args={ "model": "doubao-1.5-thinking-vision-pro", "version":"250428", "multimodal_type": "video", "inference_type": "online", "source_type": "url", "video_format": "mp4", "video_fps": 1.0, "max_tokens": 4000, "temperature": 0.1, }, num_gpus=0, batch_size=1, concurrency=1, )(col("video_tos_path"), col("vlm_prompt")), ) df.collect() logger.info("视频字幕同步分析完成: %d 条记录", df.count_rows()) df = df.with_column( "vlm_analysis_result", col("vlm_analysis_struct").struct.get("llm_result") ) df_clean = df.exclude("audio_extraction_result", "audio_binary", "asr_result", "asr_text_raw","audio_presigned_url","original_sample_rate","asr_segments_structured","vlm_prompt","vlm_analysis_struct") df_clean.show() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_tos_dir = "tos://tos_bucket/video_subtitle_sync/results/" output_s3_dir = output_tos_dir.replace("tos://", "s3://", 1) tos_config = TOSConfig.from_env() io_config = daft.io.IOConfig(s3=tos_config.to_s3_config()) parquet_s3_path = f"{output_s3_dir}video_subtitle_sync_parquet_{timestamp}" df_clean.write_parquet(parquet_s3_path, io_config=io_config) logger.info("视频字幕同步结果已保存到TOS:") logger.info("Parquet: %s", output_tos_dir + f"video_subtitle_sync_parquet_{timestamp}") if __name__ == "__main__": input_tos_dir = "tos://tos_bucket/video_subtitle_sync/test_video" audio_output_tos_dir = "tos://tos_bucket/video_subtitle_sync/video_audio_extracted/" run_pipeline(input_tos_dir, audio_output_tos_dir)
说明
代码物料:完整的 Pipeline 脚本可在此处下载:video_subtitle_sync_pipeline_show.py。
执行 Pipeline 后,您将得到一个结构化的 Parquet 文件,其中包含了对每个视频的同步性分析结果。
示例日志输出:
2025-08-19 05:58:10,INFO - 视频字幕同步检查 pipeline 开始运行... 2025-08-19 05:58:12,INFO - 视频文件加载完成: 3 条记录 2025-08-19 05:58:13,INFO - 开始视频音频抽取... 2025-08-19 05:58:45,INFO - 视频音频抽取完成。 2025-08-19 05:58:46,INFO - 开始音频ASR识别... 2025-08-19 06:00:10,INFO - 音频ASR识别完成。 2025-08-19 06:00:11,INFO - 开始ASR文本结构化解析... 2025-08-19 06:00:12,INFO - ASR文本结构化解析完成。 2025-08-19 06:00:13,INFO - 开始生成VLM提示词... 2025-08-19 06:00:14,INFO - VLM提示词生成完成。 2025-08-19 06:00:15,INFO - 开始视频字幕同步分析... 2025-08-19 06:05:20,INFO - 视频字幕同步分析完成。 2025-08-19 06:05:25,INFO - Pipeline 执行完毕!结果已保存至: tos://your-bucket/sync_results/sync_results_20250819_060525
执行 Pipeline 后,除了音频抽取结果,您还将得到一个结构化的 Parquet 文件,其中包含了对每个视频的同步性分析结果。以下是部分脱敏后的示例数据:
video_id | video_tos_path | asr_text_simple (语音识别文本及时间戳) | vlm_analysis_result (同步性分析摘要) |
|---|---|---|---|
751 | tos://.../test_video/751.mp4 | 说话人 1 0:00:00 0:00:03 | 部分不同步 (5个片段中有1个) |
1160 | tos://.../test_video/1160.mp4 | 说话人 1 0:00:00 0:00:02 | 全部同步 |
1172 | tos://.../test_video/1172.mp4 | 说话人 1 0:00:00 0:00:01 | 全部同步 |
770 | tos://.../test_video/770.mp4 | 说话人 1 0:00:00 0:00:01 | 大量不同步 (6个片段中有4个) |
703 | tos://.../test_video/703.mp4 | 说话人 2 0:00:01 0:00:07 | 严重不同步 (13个片段中有11个) |
710 | tos://.../test_video/710.mp4 | 说话人 2 0:00:01 0:00:03 | 严重不同步 (12个片段中有10个) |
当处理管道在开发机上验证无误后,可将其固化为自定义镜像,并在任务管理中创建批量任务。该任务将使用此镜像,自动、高效地对海量视频数据进行字幕同步性检查。