You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
其他
时间戳片段合并
复制全文
时间戳片段合并

算子介绍

描述

时间戳合并算子 - 秒级区间的规范化、合并与切分

核心能力

  • 规范化与排序:统一输入格式为 (start, end) 浮点秒并校验合法性。
  • 预合并小间隙:合并重叠或间隙小于等于阈值的相邻片段(pre_merge_gap_seconds)。
  • 最大静默优先切分:在最长静默处优先切分,保证每段跨度不超过 max_span_seconds。
  • 强制切块(可选):对超长片段按固定窗口切分,确保每段长度不超过上限(enforce_chunking)。

推荐实践

  • 当 VAD 输出存在短静默且希望保留语义连续性时,优先使用“最大静默优先切分”而非简单顺序切块。
  • 若业务要求硬性限制片段长度,开启 enforce_chunking 以获得连续窗口切分效果。
  • 将 pre_merge_gap_seconds 设置为略大于噪声静默的典型时长(例如 0.2–1.0 秒)。

输出格式

  • 每行返回 List[List[float]],元素为 [start, end](单位:秒),按起点升序;输出列类型为 List[List[float32]]。

处理流程

  1. 规范化:加上统一偏移 start_time、排序并校验合法性。
  2. 预合并:合并重叠与小间隙(≤ pre_merge_gap_seconds)的片段。
  3. 最大静默优先切分:在最长静默处分割,确保每段跨度 ≤ max_span_seconds;对单片段时间范围不做≤ max_span_seconds的要求。
  4. (可选)强制切块:当 enforce_chunking=True 时,按固定窗口连续切分确保时间长度 ≤ max_span_seconds。

Daft 调用

算子参数

输入

输入列名

说明

timestamps

其中每个元素为二维浮点数列表(单位:秒),形如 [[start, end], ...]。

输出

每个元素为合并后的二维浮点数列表(单位:秒),类型为 List[List[float32]];非法行返回 None。

参数

如参数没有默认值,则为必填参数

参数名称

类型

默认值

描述

start_time

float

0.0

应用于每个时间戳的起始偏移(秒)。 默认值:0

pre_merge_gap_seconds

float

0.0

微小间隙预合并的阈值(秒)。 当相邻片段间隔小于等于此值时,会合并为一个更大的片段。 默认值:0

max_span_seconds

float

20.0

最大允许的片段合并跨度(秒)。 默认值:20

enforce_chunking

bool

False

是否强制对超长片段进行连续切块。 当为 True 时,会按 max_span_seconds 长度进行连续切块。 默认值:False

调用示例

下面的代码展示了如何使用 daft 运行算子对时间戳列表进行合并拆分等规范化操作。

from __future__ import annotations

import os

import daft
from daft import col
from daft.las.functions.other.timestamps_merge import TimestampsMerge
from daft.las.functions.udf import las_udf

if __name__ == "__main__":

    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import logging

        import ray

        def configure_logging():
            logging.basicConfig(
                level=logging.INFO,
                format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S.%s".format(),
            )
            logging.getLogger("tracing.span").setLevel(logging.WARNING)
            logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
            logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
            logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)

        ray.init(dashboard_host="0.0.0.0", runtime_env={"worker_process_setup_hook": configure_logging})
        daft.context.set_runner_ray()
    daft.set_execution_config(actor_udf_ready_timeout=600)
    daft.set_execution_config(min_cpu_per_task=0)

    samples = {"timestamps": [[[0.0, 4.34], [5.50, 7.12], [8.10, 8.34], [8.50, 10.12]]]}
    ds = daft.from_pydict(samples)

    start_time = 0.0
    pre_merge_gap_seconds = 0.5
    max_span_seconds = 10.0
    enforce_chunking = True
    ds = ds.with_column(
        "timestamps_merged",
        las_udf(
            TimestampsMerge,
            construct_args={
                "start_time": start_time,
                "pre_merge_gap_seconds": pre_merge_gap_seconds,
                "max_span_seconds": max_span_seconds,
                "enforce_chunking": enforce_chunking,
            },
            num_cpus=1,
            batch_size=1,
            concurrency=1,
        )(col("timestamps")),
    )
    ds.show()

    # ╭────────────────────────────────┬───────────────────────────╮
    # │ timestamps                     ┆ timestamps_merged         │
    # │ ---                            ┆ ---                       │
    # │ List[List[Float64]]            ┆ List[List[Float32]]       │
    # ╞════════════════════════════════╪═══════════════════════════╡
    # │ [[0, 4.34], [5.5, 7.12], [8.1… ┆ [[0, 4.34], [5.5, 10.12]] │
    # ╰────────────────────────────────┴───────────────────────────╯
最近更新时间:2026.01.08 19:14:22
这个页面对您有帮助吗?
有用
有用
无用
无用