You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
Activity (任务节点)类型
Excute Pipeline
复制全文
下载 pdf
Excute Pipeline

本文介绍 Execute Pipeline 类型的 Activity 配置,用于在当前 Pipeline 中触发执行另一个已发布的 Pipeline。

概述

Execute Pipeline Activity 实现子 Pipeline 调用,将复杂的工作流拆分为多个可复用的 Pipeline 模块,由父 Pipeline 统一编排。
适用场景:

  • 复用已有的 Pipeline 逻辑(如通用的数据质量检查 Pipeline)。
  • 拆分大型 Pipeline 降低复杂度。
  • 跨项目调用其他团队维护的 Pipeline。
  • 动态传参执行同一个 Pipeline 的不同业务场景。

配置示例

- name: run_quality_check
  type: execute_pipeline
  pipelineName: data_quality_check
  pipelineId: 456
  pipelineParameterValues:
    biz_date: "${date}"
    check_level: "strict"
    target_table: "dw.order_summary"
  waitingSuccess: true
  bizDate: "${date}"
  retryPolicy:
    maxRetries: 1
    minRetryIntervalMillis: 120000
  position:
    x: "200"
    y: "100"

字段说明

字段

类型

必填

说明

type

String

固定为 execute_pipeline

pipelineName

String

目标 Pipeline 的 metadata.name

pipelineId

Long

目标 Pipeline ID(提供时优先使用)。

projectId

Long

跨项目调用时需提供目标项目 ID。

pipelineParameterValues

Map

向目标 Pipeline 传递的参数。

waitingSuccess

Boolean

是否等待子 Pipeline 执行完成(默认 false)。

bizDate

String

子 Pipeline 实例的业务日期。

waitingSuccess 行为

waitingSuccess

行为

适用场景

true

阻塞等待子 Pipeline 运行完成后再继续。子 Pipeline 成功则当前 Activity 成功,子 Pipeline 失败则当前 Activity 失败。

下游节点依赖子 Pipeline 的执行结果。

false(默认)

仅触发子 Pipeline 后立即返回成功,不等待子 Pipeline 执行完成。

仅需触发执行,不关心结果(如异步通知)。

waitingSuccess: true 的执行流程:

Execute Pipeline 节点开始
  → 触发子 Pipeline 生成实例
    → 子 Pipeline 开始执行
      → 等待子 Pipeline 执行完成...
        → 子 Pipeline SUCCEEDED → 当前 Activity SUCCEEDED
        → 子 Pipeline FAILED → 当前 Activity FAILED
waitingSuccess: false 的执行流程:

Execute Pipeline 节点开始
  → 触发子 Pipeline 生成实例
    → 立即返回 → 当前 Activity SUCCEEDED
    (子 Pipeline 在后台异步执行)

参数传递

通过 pipelineParameterValues 向子 Pipeline 传入参数,覆盖其默认参数值:

pipelineParameterValues:
  biz_date: "${date}"
  check_level: "strict"
  target_table: "dw.order_summary"

说明

详情

覆盖规则

传入的参数会覆盖子 Pipeline 中同名参数的默认值。

未传入的参数

使用子 Pipeline 中定义的默认值。

值来源

支持固定值、内置变量(${date})和父 Pipeline 参数引用。

跨项目调用

跨项目调用需同时提供 projectIdpipelineName

- name: call_external_pipeline
  type: execute_pipeline
  pipelineName: dim_table_etl
  projectId: 789                       # 目标项目 ID
  pipelineParameterValues:
    biz_date: "${date}"
  waitingSuccess: true
  bizDate: "${date}"
  position:
    x: "200"
    y: "100"

跨项目调用要求:

要求

说明

目标 Pipeline 已发布

子 Pipeline 必须处于已发布状态。

权限

当前用户需拥有目标项目的相应权限。

目标 Pipeline 调度类型

子 Pipeline 可以是 scheduled、manual 或 external 类型。

获取子 Pipeline 返回值

当子 Pipeline 通过 Set Variable 设置了 PIPELINE_OUTPUT 返回值时,父 Pipeline 的后续节点可以引用这些值:

# 子 Pipeline(data_quality_check)中的 Set Variable 节点设置了 output 变量:
# - total_records
# - status

# 父 Pipeline 中后续节点引用子 Pipeline 的返回值
- name: check_result
  type: if_condition
  condition:
    left: "{{activities.run_quality_check.output.status}}"
    op: EQUAL_TO
    right: "pass"
  dependsOn:
    activities:
      - run_quality_check

注意

仅在 waitingSuccess: true 时可获取子 Pipeline 的返回值。

典型场景

场景一:ETL + 数据质量检查

activities:
  - name: run_etl
    type: sql
    source: WORKSPACE
    path: /Workspace/Users/zhang3/sql/etl.sql
    engineType: emr_serverless_spark
    engineQueue: default
    position:
      x: "200"
      y: "100"

  - name: run_quality_check
    type: execute_pipeline
    pipelineName: data_quality_check
    pipelineParameterValues:
      target_table: "dw.order_summary"
      biz_date: "${date}"
    waitingSuccess: true
    bizDate: "${date}"
    dependsOn:
      activities:
        - run_etl
    position:
      x: "400"
      y: "100"

场景二:For Each + Execute Pipeline

- name: process_all_regions
  type: for_each
  items: '["cn-beijing", "cn-shanghai", "us-east"]'
  concurrency: 3
  activities:
    - name: run_region_etl
      type: execute_pipeline
      pipelineName: region_etl_pipeline
      pipelineParameterValues:
        region: "{{items.each}}"
        biz_date: "${date}"
      waitingSuccess: true
      bizDate: "${date}"
      position:
        x: "100"
        y: "50"

使用建议

建议

说明

需要结果时设 waitingSuccess: true

下游依赖子 Pipeline 结果时务必设为 true。

子 Pipeline 保持独立可运行

子 Pipeline 应能独立调试和运行,不应强依赖父 Pipeline 上下文。

避免循环调用

不要出现 A → B → A 的 Pipeline 调用链,会导致死锁。

传递关键参数

将 bizDate 和业务参数显式传入,不依赖子 Pipeline 的默认值。

最近更新时间:2026.06.12 11:44:17
这个页面对您有帮助吗?
有用
有用
无用
无用