视频时间戳切分处理器,支持按指定时间范围分割
输入列名 | 说明 |
|---|---|
video_paths | 包含输入视频路径的数组 默认值:None |
video_binaries | 包含视频二进制数据的数组 默认值:None |
video_formats | 包含输入视频格式(如 'mp4'、'avi' 等)的数组,指定 video_binaries 时可以提供格式信息 默认值:None |
timestamp_ranges | 每行对应的时间戳范围列表,支持格式: - [(start,end), (start,end)] 或 [[start,end], [start,end]] - 单个 (start,end) 或 [start,end] |
output_basenames | 可选,输出子目录名(文件名)数组 |
处理后的结构体字段包括:
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
output_tos_dir | str | 保存视频片段的TOS路径,若为空字符串则不上传 默认值:"" | |
output_segments_binary | bool | False | 是否输出视频片段的二进制数据 默认值:False |
output_video_format | str or None | 全局指定输出视频格式(如"mp4"、"avi"等),优先级高于输入文件后缀和video_format列 |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子按时间戳切分视频。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video import VideoSplitByTimestamps if __name__ == "__main__": # 更改完切分的视频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") output_tos_dir = f"tos://{TOS_DIR}/video/video_split_by_timestamps" if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) # 使用环境变量构建URL tos_dir_url = os.getenv("TOS_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") samples = { "video_path": [ f"https://{tos_dir_url}/public/shared_video_dataset/sample.mp4" ], "timestamp_ranges": [[(0.0, 2.0), (2.0, 4.0)]], } ds = daft.from_pydict(samples) splitter = las_udf( VideoSplitByTimestamps, construct_args={ "output_tos_dir": output_tos_dir, "output_segments_binary": False, "output_video_format": "mp4", }, ) # 使用 Daft 进行分布式处理 ds = ds.with_column("results", splitter(col("video_path"), None, None, col("timestamp_ranges"))) ds.show() # ╭────────────────────────────────┬────────────────────────────────────────┬─────────────────────────────────────────────────────────────╮ # │ video_path ┆ timestamp_ranges ┆ results │ # │ --- ┆ --- ┆ --- │ # │ Utf8 ┆ List[Struct[_0: Float64, _1: Float64]] ┆ Struct[segments: List[Utf8], segments_binary: List[Binary]] │ # ╞════════════════════════════════╪════════════════════════════════════════╪═════════════════════════════════════════════════════════════╡ # │ https://las-cn-beijing-publi-… ┆ [{_0: 0, ┆ {segments: ["tos://tos_bucket/video/video_split_b… │ # │ ┆ }, {_0: 2, ┆ │ # │ ┆ _1… ┆ │ # ╰────────────────────────────────┴────────────────────────────────────────┴─────────────────────────────────────────────────────────────╯