视频格式转换处理器,将各种视频格式转换为MP4
输入列名 | 说明 |
|---|---|
input_col | 包含输入视频路径的数组(支持本地路径、HTTP/HTTPS URL、TOS/S3 URL) |
output_col | 包含输出MP4文件路径的数组 |
包含转换结果路径的数组,成功返回输出路径,失败返回None
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
video_codec | str | libx264 | 视频编码器,支持libx264、libx265等 默认值:"libx264" |
crf | int | 23 | 视频质量控制,取值范围0-51,越小质量越好 默认值:23 |
preset | str | medium | 编码速度预设,支持ultrafast、superfast、veryfast、faster、fast、medium、slow、slower、veryslow 默认值:"medium" |
max_height | int or None | None | 视频最大高度限制,超过时自动缩放,为None时不限制 默认值:None |
audio_codec | str | aac | 音频编码器,支持aac等 默认值:"aac" |
audio_bitrate | str | 192k | 音频码率,如"192k"、"128k" 默认值:"192k" |
audio_sample_rate | int or None | None | 音频采样率,如44100。48000,为None时保持原始采样率 默认值:None |
extra_params | list or None | None | 额外的ffmpeg参数列表,如["-movflags", "+faststart"] 默认值:None |
timeout | int or None | None | 单个视频处理超时时间(秒),为None时不限制 默认值:None |
下面的代码展示了如何使用 Daft(适用于分布式)运行算子对视频进行MP4格式转换。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.udf import las_udf from daft.las.functions.video import VideoConvertToMp4 if __name__ == "__main__": # 转换后的视频会保存到指定的TOS路径下,因此,需要设置好环境变量以保证有权限写入TOS,包括:ACCESS_KEY,SECRET_KEY,TOS_ENDPOINT,TOS_REGION,TOS_TEST_DIR TOS_TEST_DIR_URL = os.getenv("TOS_TEST_DIR_URL", "las-cn-beijing-public-online.tos-cn-beijing.volces.com") TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) samples = { "input_path": [f"https://{TOS_TEST_DIR_URL}/public/archive/video_convert_to_mp4/sample.mp4"], "output_path": [f"tos://{TOS_TEST_DIR}/video_convert_to_mp4/sample_converted.mp4"], } ds = daft.from_pydict(samples) # Using Daft to convert video to MP4 format constructor_kwargs = { "video_codec": "libx264", "crf": 23, "preset": "medium", "max_height": 240, "audio_codec": "aac", "audio_bitrate": "192k", "select_audio": "auto", "extra_params": ["-movflags", "+faststart"], } ds = ds.with_column( "convert_result", las_udf( VideoConvertToMp4, construct_args=constructor_kwargs, num_cpus=1, concurrency=1, batch_size=1, )(col("input_path"), col("output_path")), ) ds.show() # ╭────────────────────────────────┬────────────────────────────────┬────────────────────────────────╮ # │ input_path ┆ output_path ┆ convert_result │ # │ --- ┆ --- ┆ --- │ # │ String ┆ String ┆ String │ # ╞════════════════════════════════╪════════════════════════════════╪════════════════════════════════╡ # │ https://las-public-data-qa.to… ┆ tos://tos_bucket/video_conver… ┆ tos://tos_bucket/video_conver… │ # ╰────────────────────────────────┴────────────────────────────────┴────────────────────────────────╯