本文介绍 Execute Pipeline 类型的 Activity 配置,用于在当前 Pipeline 中触发执行另一个已发布的 Pipeline。
Execute Pipeline Activity 实现子 Pipeline 调用,将复杂的工作流拆分为多个可复用的 Pipeline 模块,由父 Pipeline 统一编排。
适用场景:
- name: run_quality_check type: execute_pipeline pipelineName: data_quality_check pipelineId: 456 pipelineParameterValues: biz_date: "${date}" check_level: "strict" target_table: "dw.order_summary" waitingSuccess: true bizDate: "${date}" retryPolicy: maxRetries: 1 minRetryIntervalMillis: 120000 position: x: "200" y: "100"
字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
| String | 是 | 固定为 |
| String | 是 | 目标 Pipeline 的 |
| Long | 否 | 目标 Pipeline ID(提供时优先使用)。 |
| Long | 否 | 跨项目调用时需提供目标项目 ID。 |
| Map | 否 | 向目标 Pipeline 传递的参数。 |
| Boolean | 否 | 是否等待子 Pipeline 执行完成(默认 |
| String | 是 | 子 Pipeline 实例的业务日期。 |
waitingSuccess | 行为 | 适用场景 |
|---|---|---|
| 阻塞等待子 Pipeline 运行完成后再继续。子 Pipeline 成功则当前 Activity 成功,子 Pipeline 失败则当前 Activity 失败。 | 下游节点依赖子 Pipeline 的执行结果。 |
| 仅触发子 Pipeline 后立即返回成功,不等待子 Pipeline 执行完成。 | 仅需触发执行,不关心结果(如异步通知)。 |
waitingSuccess: true 的执行流程: Execute Pipeline 节点开始 → 触发子 Pipeline 生成实例 → 子 Pipeline 开始执行 → 等待子 Pipeline 执行完成... → 子 Pipeline SUCCEEDED → 当前 Activity SUCCEEDED → 子 Pipeline FAILED → 当前 Activity FAILED
waitingSuccess: false 的执行流程: Execute Pipeline 节点开始 → 触发子 Pipeline 生成实例 → 立即返回 → 当前 Activity SUCCEEDED (子 Pipeline 在后台异步执行)
通过 pipelineParameterValues 向子 Pipeline 传入参数,覆盖其默认参数值:
pipelineParameterValues: biz_date: "${date}" check_level: "strict" target_table: "dw.order_summary"
说明 | 详情 |
|---|---|
覆盖规则 | 传入的参数会覆盖子 Pipeline 中同名参数的默认值。 |
未传入的参数 | 使用子 Pipeline 中定义的默认值。 |
值来源 | 支持固定值、内置变量( |
跨项目调用需同时提供 projectId 和 pipelineName:
- name: call_external_pipeline type: execute_pipeline pipelineName: dim_table_etl projectId: 789 # 目标项目 ID pipelineParameterValues: biz_date: "${date}" waitingSuccess: true bizDate: "${date}" position: x: "200" y: "100"
跨项目调用要求:
要求 | 说明 |
|---|---|
目标 Pipeline 已发布 | 子 Pipeline 必须处于已发布状态。 |
权限 | 当前用户需拥有目标项目的相应权限。 |
目标 Pipeline 调度类型 | 子 Pipeline 可以是 scheduled、manual 或 external 类型。 |
当子 Pipeline 通过 Set Variable 设置了 PIPELINE_OUTPUT 返回值时,父 Pipeline 的后续节点可以引用这些值:
# 子 Pipeline(data_quality_check)中的 Set Variable 节点设置了 output 变量: # - total_records # - status # 父 Pipeline 中后续节点引用子 Pipeline 的返回值 - name: check_result type: if_condition condition: left: "{{activities.run_quality_check.output.status}}" op: EQUAL_TO right: "pass" dependsOn: activities: - run_quality_check
注意
仅在 waitingSuccess: true 时可获取子 Pipeline 的返回值。
activities: - name: run_etl type: sql source: WORKSPACE path: /Workspace/Users/zhang3/sql/etl.sql engineType: emr_serverless_spark engineQueue: default position: x: "200" y: "100" - name: run_quality_check type: execute_pipeline pipelineName: data_quality_check pipelineParameterValues: target_table: "dw.order_summary" biz_date: "${date}" waitingSuccess: true bizDate: "${date}" dependsOn: activities: - run_etl position: x: "400" y: "100"
- name: process_all_regions type: for_each items: '["cn-beijing", "cn-shanghai", "us-east"]' concurrency: 3 activities: - name: run_region_etl type: execute_pipeline pipelineName: region_etl_pipeline pipelineParameterValues: region: "{{items.each}}" biz_date: "${date}" waitingSuccess: true bizDate: "${date}" position: x: "100" y: "50"
建议 | 说明 |
|---|---|
需要结果时设 waitingSuccess: true | 下游依赖子 Pipeline 结果时务必设为 true。 |
子 Pipeline 保持独立可运行 | 子 Pipeline 应能独立调试和运行,不应强依赖父 Pipeline 上下文。 |
避免循环调用 | 不要出现 A → B → A 的 Pipeline 调用链,会导致死锁。 |
传递关键参数 | 将 bizDate 和业务参数显式传入,不依赖子 Pipeline 的默认值。 |