视频关键帧抽取处理器,支持多算法动态检测。
输入列名 | 说明 |
|---|---|
video_paths | 输入视频路径列,类型为数组。 默认值:None |
video_binaries | 输入视频二进制数据列,类型为数组。 默认值:None |
video_formats | 输入视频格式列(如 'mp4'、'avi' 等),类型为数组。 默认值:None |
处理后的数组,结构体字段包括:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
method | str | I_frame | 抽取关键帧的方法,支持 "difference"(像素差分法)、"optical_flow"(光流法)、"histogram"(直方图法)、"I_frame"(I型关键帧标识)。 可选值:["difference", "optical_flow", "histogram", "I_frame"] 默认值:"I_frame" |
img_type | str | .jpg | 输出关键帧图片格式,支持 ".jpg"、".png"。 可选值:[".jpg", ".png"] 默认值:".jpg" |
threshold | float | 0 | 用于判断关键帧的阈值。difference 推荐 2000000,histogram 推荐 0.01,optical_flow 推荐 2.0。 默认值:0 |
keyframes_cnt | int | 10 | 指定抽取关键帧的数量,-1 表示不限制数量。 默认值:10 |
seconds_per_frame | int | -1 | 抽帧间隔,单位为秒,-1 表示不指定间隔。 默认值:-1 |
output_tos_dir | str | 保存关键帧图片到 TOS 的目标路径,若为空字符串则不上传。 默认值:"" |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子抽取视频关键帧。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video import VideoKeyframes if __name__ == "__main__": # 更改完抽取的关键帧会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_dir = f"tos://{TOS_DIR}/video/video_keyframes" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "video_path": [ f"https://{tos_dir_url}/public/shared_video_dataset/sample.mp4" ] } ds = daft.from_pydict(samples) extractor = las_udf( VideoKeyframes, construct_args={ "method": "I_frame", "keyframes_cnt": 5, "output_tos_dir": output_tos_dir, }, ) # 使用 Daft 进行分布式处理 ds = ds.with_column("results", extractor(col("video_path"))) ds.show() # ╭────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ # │ video_path ┆ results │ # │ --- ┆ --- │ # │ Utf8 ┆ Struct[keyframes: List[List[List[List[Int64]]]], base64: List[Utf8], timestamps: List[Float64], tos_paths: │ # │ ┆ List[Utf8]] │ # ╞════════════════════════════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ {keyframes: [[[[9, 17, 16], [… │ # ╰────────────────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────╯