时间戳合并算子 - 秒级区间的规范化、合并与切分
输入列名 | 说明 |
|---|---|
timestamps | 其中每个元素为二维浮点数列表(单位:秒),形如 [[start, end], ...]。 |
每个元素为合并后的二维浮点数列表(单位:秒),类型为 List[List[float32]];非法行返回 None。
如参数没有默认值,则为必填参数
参数名称 | 类型 | 默认值 | 描述 |
|---|---|---|---|
start_time | float | 0.0 | 应用于每个时间戳的起始偏移(秒)。 默认值:0 |
pre_merge_gap_seconds | float | 0.0 | 微小间隙预合并的阈值(秒)。 当相邻片段间隔小于等于此值时,会合并为一个更大的片段。 默认值:0 |
max_span_seconds | float | 20.0 | 最大允许的片段合并跨度(秒)。 默认值:20 |
enforce_chunking | bool | False | 是否强制对超长片段进行连续切块。 当为 True 时,会按 max_span_seconds 长度进行连续切块。 默认值:False |
下面的代码展示了如何使用 daft 运行算子对时间戳列表进行合并拆分等规范化操作。
from __future__ import annotations import os import daft from daft import col from daft.las.functions.other.timestamps_merge import TimestampsMerge from daft.las.functions.udf import las_udf if __name__ == "__main__": if os.getenv("DAFT_RUNNER", "native") == "ray": import logging import ray def configure_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S.%s".format(), ) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging}) daft.context.set_runner_ray() daft.set_execution_config(actor_udf_ready_timeout=600) daft.set_execution_config(min_cpu_per_task=0) samples = {"timestamps": [[[0.0, 4.34], [5.50, 7.12], [8.10, 8.34], [8.50, 10.12]]]} ds = daft.from_pydict(samples) start_time = 0.0 pre_merge_gap_seconds = 0.5 max_span_seconds = 10.0 enforce_chunking = True ds = ds.with_column( "timestamps_merged", las_udf( TimestampsMerge, construct_args={ "start_time": start_time, "pre_merge_gap_seconds": pre_merge_gap_seconds, "max_span_seconds": max_span_seconds, "enforce_chunking": enforce_chunking, }, num_cpus=1, batch_size=1, concurrency=1, )(col("timestamps")), ) ds.show() # ╭────────────────────────────────┬───────────────────────────╮ # │ timestamps ┆ timestamps_merged │ # │ --- ┆ --- │ # │ List[List[Float64]] ┆ List[List[Float32]] │ # ╞════════════════════════════════╪═══════════════════════════╡ # │ [[0, 4.34], [5.5, 7.12], [8.1… ┆ [[0, 4.34], [5.5, 10.12]] │ # ╰────────────────────────────────┴───────────────────────────╯