You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
Activity (任务节点)类型
SQL
复制全文
下载 pdf
SQL

本文介绍 SQL 类型的 Activity 配置,用于在 Pipeline 中执行 SQL 脚本。

概述

SQL Activity 执行指定的 SQL 脚本文件,是最常用的数据处理节点类型,适用于大规模批处理 ETL 场景。
适用场景:

  • 数据抽取(Extract):从源表中筛选和提取数据。
  • 数据转换(Transform):清洗、聚合、关联等 SQL 逻辑。
  • 数据加载(Load):将处理结果写入目标表。

配置示例

- name: extract_orders
  type: sql
  source: WORKSPACE
  path: /Workspace/Users/zhang3/sql/extract_orders.sql
  engineType: emr_serverless_spark
  engineQueue: default
  parameterValues:
    biz_date: "${date}"
    target_region: "cn-beijing"
  retryPolicy:
    maxRetries: 2
    minRetryIntervalMillis: 60000
    timeoutSeconds: 3600
    retryOnTimeout: true
  position:
    x: "200"
    y: "100"

字段说明

字段

类型

必填

说明

type

String

固定为 sql

source

Enum

代码来源:WORKSPACETOS

path

String

SQL 文件路径

engineType

Enum

计算引擎类型

engineQueue

String

计算队列名称

parameterValues

Map

参数传值(键值对)

engineType 可选值

说明

适用场景

emr_serverless_spark

EMR Serverless Spark 引擎

大数据量 ETL、复杂 SQL、Hive 表操作。

presto

Presto 引擎

交互式查询、跨数据源联合查询。

bytehouse

ByteHouse 引擎

实时分析、OLAP 场景。

source 可选值

说明

典型用法

WORKSPACE

工作空间内文件

日常开发,代码纳入 Git 版本管理。

TOS

对象存储文件

外部系统生成的 SQL 文件。

参数传递

通过 parameterValues 向 SQL 脚本传入参数:

parameterValues:
  biz_date: "${date}"
  target_region: "cn-beijing"
  threshold: "1000"

在 SQL 脚本中引用:

-- 通过 ${参数名} 引用
INSERT OVERWRITE TABLE dw.order_summary PARTITION (dt = '${biz_date}')
SELECT
    region,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount
FROM ods.orders
WHERE dt = '${biz_date}'
  AND region = '${target_region}'
  AND amount > ${threshold}
GROUP BY region;

参数值来源

来源

语法

示例

固定值

直接写值

"cn-beijing"

Pipeline 参数

{{pipeline.parameters.xxx}}

"{{pipeline.parameters.biz_date}}"

项目参数

{{project.parameters.xxx}}

"{{project.parameters.env}}"

Pipeline 变量

{{variables.xxx}}

"{{variables.record_count}}"

内置日期变量

${date}

"${date}"

多语句执行

一个 SQL 文件中可以包含多条 SQL 语句,用分号 ; 分隔,按顺序依次执行:

-- 步骤 1:创建临时表
CREATE TABLE IF NOT EXISTS tmp.order_staging AS
SELECT * FROM ods.orders WHERE dt = '${biz_date}';

-- 步骤 2:数据清洗
INSERT OVERWRITE TABLE dw.order_clean PARTITION (dt = '${biz_date}')
SELECT order_id, user_id, amount
FROM tmp.order_staging
WHERE amount > 0 AND user_id IS NOT NULL;

-- 步骤 3:清理临时表
DROP TABLE IF EXISTS tmp.order_staging;

注意

任意一条语句执行失败时,后续语句不再执行,Activity 进入 FAILED 状态。

使用建议

建议

说明

一个 Activity 一个职责

每个 SQL Activity 完成一个明确的数据处理步骤,避免将所有逻辑堆在一个文件中。

使用 PARTITION 覆盖写

推荐 INSERT OVERWRITE ... PARTITION 实现幂等写入,确保重跑安全。

参数化而非硬编码

日期、表名、阈值等通过 parameterValues 传入。

选择合适的引擎

大数据量选 Spark,交互式查询选 Presto,OLAP 场景选 ByteHouse。

配置超时

为长时间运行的 SQL 配置 retryPolicy.timeoutSeconds,防止资源长期占用。

最近更新时间:2026.06.12 11:44:17
这个页面对您有帮助吗?
有用
有用
无用
无用