视频关键帧切分处理器,支持智能片段分割。
输入列名 | 说明 |
|---|---|
video_paths | 包含输入视频路径的数组 默认值:None |
video_binaries | 包含视频二进制数据的数组 默认值:None |
video_formats | 包含输入视频格式(如 'mp4'、'avi' 等)的数组,指定 video_binaries 时可以提供格式信息 默认值:None |
output_basenames | 可选,输出子目录名(文件名)数组 |
处理后的结构体字段包括:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
method | str | I_frame | 抽取关键帧的方法,支持 "difference"(像素差分法)、"histogram"(直方图法)、"I_frame"(I型关键帧标识)。 注意: 非I-frame方法可能需要较长时间处理视频。 可选值:["difference", "histogram", "I_frame"] 默认值:"I_frame" |
threshold | float | 0 | 用于判断关键帧的阈值。difference 推荐 2000000,histogram 推荐 0.01。 默认值:0 |
keyframes_cnt | int | 10 | 用于指定抽取视频关键帧的数量。-1表示使用所有检测到的关键帧;0表示无效参数(不会切分视频); 如果设置的数量大于实际关键帧数,则使用所有关键帧;否则会从所有关键帧中均匀地选取指定数量的关键帧, 避免关键帧集中分布在某个时间段内。 默认值:10 |
seconds_per_frame | int | -1 | 抽帧间隔,单位为秒,-1 表示不指定间隔。 默认值:-1 |
output_tos_dir | str | 保存视频片段的TOS路径,若为空字符串则不上传。 默认值:"" | |
output_segments_binary | bool | False | 是否输出视频片段的二进制数据。 默认值:False |
output_video_format | str or None | 全局指定输出视频格式(如"mp4"、"avi"等),优先级高于输入文件后缀和video_format列。 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子按关键帧切分视频。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video import VideoSplitByKeyframes if __name__ == "__main__": # 更改完切分的视频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_dir = f"tos://{TOS_DIR}/video/video_split_by_keyframes" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "video_path": [ f"https://{tos_dir_url}/public/shared_video_dataset/sample.mp4" ] } ds = daft.from_pydict(samples) splitter = las_udf( VideoSplitByKeyframes, construct_args={ "method": "I_frame", "keyframes_cnt": 2, "output_tos_dir": output_tos_dir, }, ) # 使用 Daft 进行分布式处理 ds = ds.with_column("results", splitter(col("video_path"))) ds.show() # ╭──────────────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ # │ video_path ┆ results │ # │ --- ┆ --- │ # │ Utf8 ┆ Struct[segments: List[Utf8], segments_binary: List[Binary], video_format: List[Utf8]] │ # ╞══════════════════════════════════════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ {segments: ["tos://tos_bucket/video/video_split_by_keyframes/sample/segment_1.mp4", "tos://tos_bucket/video/video_split_by_keyf… │ # ╰──────────────────────────────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯