从视频中采样图像帧处理器,支持多种采样模式及时间范围控制。
输入列名 | 说明 |
|---|---|
video_paths | 输入视频路径列(本地或远程URI),类型 string,默认 None |
video_binaries | 输入视频二进制列,类型 binary,默认 None |
video_formats | 对应二进制输入的格式(如"mp4","mov"),类型 string,默认 None |
video_durations | 视频时长列(秒),类型 float,默认 None |
结构体数组,字段包括:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
sample_mode | str | by_count_uniform | 采样模式 可选值: ["by_count_uniform", "by_interval_time", "by_interval_frames", "by_fps", "by_timestamps"] |
start_time_sec | float | 0.0 | 起始时间(秒) |
end_time_sec | float or None | 结束时间(秒),None 表示视频末尾 | |
count_k | int or None | 均匀/随机采样的帧数(by_count_uniform使用) | |
interval_sec | float or None | 时间间隔Δt(秒,by_interval_time使用) | |
interval_frames | int or None | 解码帧间隔N(by_interval_frames使用) | |
target_fps | float or None | 目标FPS(by_fps使用) | |
timestamps_sec | list or None | 目标时间戳列表(秒,by_timestamps使用) | |
img_type | str | .jpg | 输出图片格式(用于base64与可选TOS落盘),可选[".jpg", ".png", ".webp"],默认 ".jpg" |
output_tos_dir | str | 若非空则将采样帧落盘本地并上传至 TOS 的目标目录(目录下每个视频单独子目录) | |
max_frames | int or None | 返回帧上限(防御性限制),None 表示不限制 | |
seed | int | 42 | 当无法提前获知视频总时长时,算子会采用 reservoir sampling(水塘抽样)算法,从视频流中均匀随机采样指定数量的帧。此参数用于设置随机数种子,保证采样结果可复现,默认值为 42。 |
output_frames | bool | True | 是否输出原始帧数组(较大,默认 True)。注意:即使设置为 False,返回结果仍包含 frames 字段,但内容为空列表。 |
output_base64 | bool | True | 是否输出 base64 编码(较大,默认 True)。注意:即使设置为 False,返回结果仍包含 base64 字段,但内容为空列表。 |
下面的代码展示了如何使用 Daft(适用于分布式)从视频中按帧间隔采样若干图像帧。本示例设置 interval_frames=100,实际产出 6 张图片。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video.video_frame_sampler import VideoFrameSampler if __name__ == "__main__": # 更改完采样的帧会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_dir = f"tos://{TOS_DIR}/video/video_frame_sampler" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "video_path": [ f"https://{tos_dir_url}/public/shared_video_dataset/sample.mp4" ] } ds = daft.from_pydict(samples) sampler = las_udf( VideoFrameSampler, construct_args={ "sample_mode": "by_interval_frames", "interval_frames": 100, "output_tos_dir": output_tos_dir, "img_type": ".jpg", }, ) # 使用 Daft 进行分布式处理 ds = ds.with_column("results", sampler(col("video_path"))) ds = ds.select( "video_path", col("results").struct.get("frames").alias("frames"), col("results").struct.get("base64").alias("base64"), col("results").struct.get("timestamps").alias("timestamps"), col("results").struct.get("frame_indices").alias("frame_indices"), col("results").struct.get("tos_paths").alias("tos_paths"), ) ds.show() # ╭────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ # │ video_path ┆ results │ # │ --- ┆ --- │ # │ Utf8 ┆ Struct[frames: List[Binary], base64: List[Utf8], timestamps: List[Float64], frame_indices: List[Int64], tos_paths: List[Utf8]] │ # ╞════════════════════════════════╪══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ {frames: […], base64: […], timestamps: […], frame_indices: […], tos_paths: ["tos://tos_bucket/video/video_frame_sampler/frame_0.jpg", …]} │ # ╰────────────────────────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯