算子ID:daft.las.functions.video.video_adaptive_keyframes_sampling.VideoAdaptiveKeyframesSampling
视频自适应关键帧抽取处理器,基于CLIP和自适应算法进行智能抽帧。
细分项 | 注意与前提 |
|---|---|
费用 | 调用算子前,您需先了解使用算子时的模型调用费用,详情请参见大模型调用计费。 |
鉴权(API Key) | 调用算子前,您需要先生成算子调用的API Key,并建议将API Key配置为环境变量,便于更安全地调用算子,详情请参见获取 API Key 并配置。 |
BaseURL | 调用算子前,您需要先根据您当前使用的LAS服务所在地域,了解算子调用的BaseURL,用于配置算子调用路径参数取值。 |
输入列名 | 说明 |
|---|---|
videos | 输入视频列,类型依据 video_src_type(url/base64/binary) |
包含关键帧信息的结构体数组,每个元素包含:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
text | str | "" | 用于计算语义相似度的文本描述。 |
model_path | str | "/opt/las/models" | 模型基础路径。如果为空,则从Hugging Face Hub下载模型。 |
clip_model_name | str | "openai/clip-vit-base-patch32" | CLIP模型名称/子目录。 |
dtype | str | "float16" | 模型推理精度选择:
可选值:["float16", "float32"] |
video_src_type | str | "video_url" | 输入视频的格式类型,支持:
可选值: ["video_url", "video_base64", "video_binary"] |
fps | float | 1.0 | 候选帧采样率(按fps均匀采样)。 |
max_num_frames | int | 32 | 最终要选出的关键帧数。 |
t1 | float | 0.8 | 判断是否有明显峰值的阈值(mean_top - mean)。 |
t2 | float | -100 | 判断是否继续分割的std阈值。 |
all_depth | int | 5 | 最大递归深度。 |
img_type | str | ".jpg" | 输出关键帧图片格式,支持 ".jpg"、".png"。 |
output_tos_dir | str | "" | 保存关键帧图片到 TOS 的目标路径,若为空字符串则不上传。 |
return_keyframes | bool | true | 是否返回关键帧图片的array数据。 |
batch_size | int | 16 | 批量推理的帧数量。 |
rank | int | 0 | GPU编号。 |
video_format | str | "mp4" | 二进制/base64输入的视频格式(如mp4、mov)。 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子进行视频自适应关键帧抽取。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video.video_adaptive_keyframes_sampling import VideoAdaptiveKeyframesSampling if __name__ == "__main__": # 抽取的关键帧会保存到指定的 TOS 路径下,因此,需要设置好环境变量以保证有权限写入 TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "your-bucket") output_tos_dir = f"tos://{TOS_TEST_DIR}/video_adaptive_keyframes_sampling" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建 URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "video": [ f"https://{tos_dir_url}/public/shared_video_dataset/cooking.mp4" ] } ds = daft.from_pydict(samples) keyframe_sampler = las_udf( VideoAdaptiveKeyframesSampling, construct_args={ "text": "spoon", "output_tos_dir": output_tos_dir, "max_num_frames": 4, "fps": 2.0, "batch_size": 16, "rank": 0, "return_keyframes": True, "img_type": ".jpg", }, num_gpus=1, concurrency=1, batch_size=1, ) # 使用 Daft 进行分布式处理 ds = ds.with_column("keyframes", keyframe_sampler(col("video"))) ds.show() # ╭────────────────────────────────┬───────────────────────────────────────────────────────────────────────────────╮ # │ video ┆ keyframes │ # │ --- ┆ --- │ # │ Utf8 ┆ Struct[tos_paths: List[Utf8], frame_ids: List[Int64], scores: List[Float32]] │ # ╞════════════════════════════════╪═══════════════════════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ {tos_paths: [las-cn-beijing…, ...], frame_ids: [123, ...], scores: [...]} │ # ╰────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────╯