TOS 预签名 URL 生成处理器
输入列名 | 说明 |
|---|---|
urls | 输入的 TOS 文件路径 |
生成的签名 URL
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
expires | int | 3600 | 签名 URL 的过期时间,单位秒。默认值 3600 |
下面的代码展示了如何使用 daft 运行算子,生成TOS的普通预签名。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.text.pre_sign_url_for_tos import PreSignUrlForTos from daft.las.functions.udf import las_udf if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) if __name__ == "__main__": # 样例数据 TOS_TEST_DIR = os.getenv("TOS_TEST_DIR", "tos_bucket") samples = {"url": [f"tos://{TOS_TEST_DIR}/sample.mp4"]} # 使用提供的算子生成签名URL ds = daft.from_pydict(samples) ds = ds.with_column( "signed_url", las_udf( PreSignUrlForTos, construct_args={ "expires": 3600, }, )(col("url")), ) ds.show() # 输出内容,以实际情况为准 # ╭─────────────────────────────┬────────────────────────────────╮ # │ url ┆ signed_url │ # │ --- ┆ --- │ # │ Utf8 ┆ Utf8 │ # ╞═════════════════════════════╪════════════════════════════════╡ # │ tos://tos_bucket/sample.mp4 ┆ https://tos_bucket.tos-cn-bei… │ # ╰─────────────────────────────┴────────────────────────────────╯