You need to enable JavaScript to run this app.
大数据研发治理套件

大数据研发治理套件

复制全文
通用
循环任务
复制全文
循环任务

循环任务即 do-while 任务,是 DataLeap 数据开发平台提供的一种逻辑控制节点类型,循环节点内部可根据业务需要自定义添加多种任务类型来满足业务场景,如:离线数据集成通道任务、Shell 任务、Python 任务等。创建循环任务后,您可以重新编排循环节点内部的业务流程,控制进入循环的开发节点类型,并设置循环终止条件来控制是否退出循环。
本文将为您介绍循环任务节点的配置说明。

注意事项

  • 循环任务目前以白名单形式放开,您可通过提工单方式,联系我们进行加白后开通使用。

新建任务

  1. 登录 DataLeap 租户控制台
  2. 在具体项目中进入数据开发界面,并单击新建任务按钮进行任务新建。
  3. 依次选择数据开发 > 通用 > 循环任务
  4. 填写任务基本信息,单击确定按钮,完成任务创建。

    注意

    任务名称信息仅允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在 127 个字符以内。

  5. 单击确定按钮,完成任务创建。

Image

循环逻辑说明

循环任务内部支持单一子节点和复杂的组合子节点,并可通过拖拉拽的方式,自定义多个子节点之间的依赖关系。根据业务需要,在循环任务内部形成一套完整可执行的工作流。
Image
循环任务整体的循环次数,可由下方“循环终止条件”的代码逻辑来决定整体循环执行几次。
通常循环任务会与上游任务有输出且支持将输出结果传递给下游使用的任务类型联合使用,如:Python、Shell 任务,在上游任务的调度设置中,勾选“将运行结果当做输出”选项,进行输出结果的传递。任务输入输出操作详见“输入输出参数设置”。
Image

子节点配置

  1. 创建子任务节点

    说明

    以创建 ByteHouse SQL 任务类型为例,您需先前往管理控制台绑定相应的引擎实例,操作详见“创建项目”。

    您可根据实际业务场景,选择不同的任务类型进行编排组合。目前工作流任务支持添加以下几种子任务类型:

    引擎类型

    说明

    EMR

    基于开源 Hadoop 生态的企业级大数据分析系统 EMR 任务,完全兼容开源,支持 EMR MapReduce、EMR Spark、 EMR HSQL、EMR StarRocks、EMR Doris、EMR 报表任务等任务类型。
    任务说明详见 EMR 任务开发

    EMR Serverless StarRocks

    开源 StarRocks 在火山引擎 EMR 上的全托管服务,您可以灵活的创建和管理 StarRocks 实例以及数据,支持 EMR Serverless StarRocks SQL。
    任务说明详见 EMR Serverless StarRocks SQL

    EMR Serverless Spark

    基于 Serverless 的 Spark 服务,完全兼容开源 Spark 引擎能力,并通过 LAS Foramtion 提供统一元数据服务和数据权限服务。在 DataLeap 中支持 Spark SQL 和 Spark Jar 任务类型,满足中大型客户对不同状态资源的差异化诉求。
    任务说明详见 Spark SQLSpark Jar

    流式计算 Flink 版

    火山引擎上提供的 100% 兼容 Apache Flink 的企业级全托管流式计算平台,支持开箱即用,免运维,Serverless 极致弹性,极简 SQL 开发,流批一体等特性。
    支持Flink Batch SQLJava Flink Batch 任务类型。

    LAS

    Serverless 湖仓一体分析服务,提供多模引擎,完全兼容开源 Spark、Presto、Hudi 生态,支持 LAS SQL、LAS Spark、LAS 报表任务类型。
    各任务操作说明详见 LAS 任务开发

    ByteHouse CE

    基于开源 ClickHouse 进行深度优化和改造的引擎类型,提供海量数据上更强的查询服务和数据写入性能,支持 ByteHouse CE SQL 任务类型。
    任务操作说明详见 ByteHouse CE SQL 任务开发

    通用

    通用引擎子任务类型,通过 Shell/Python 语言来实现业务逻辑开发。
    任务操作说明详见通用任务开发

    数据集成

    支持多种异构数据源之间进行高速稳定的数据同步离线任务类型。
    操作说明详见离线数据同步

  2. 添加子任务依赖
    子任务创建完成后,您可通过子任务中心的点,用拉线的方式来形成子任务间的上下游依赖关系。依赖关系添加后,您可单击画板右上方的自动排版按钮,形成按顺序依赖的 DAG 关系图。

  3. 管理子任务
    在画板中,您也可对已创建的子任务进行以下操作:

    功能

    说明

    编辑子节点

    鼠标移动至子节点图标上方,右键单击编辑子节点按钮或直接双击子节点图标,进入具体的子节点内部,进行业务逻辑代码的开发。

    说明

    子节点内部的调度设置中,没有任务依赖相关的配置,即内部子节点不能单独被外部任务依赖或依赖外部任务。仅循环任务整体能与外部任务形成依赖关系。

    删除子节点

    鼠标移动至子节点图标上方,右键单击删除子节点按钮,二次确认后,可将创建的子任务从画板中删除。

    说明

    删除操作会从工作流中直接删除,且不会进入回收站,需谨慎操作删除子节点。

    重命名子节点

    鼠标移动至子节点图标上方,右键单击重命名子节点按钮,对当前子节点进行重命名操作。只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,127 个字符以内。

    删除连线

    鼠标移动至两个子节点之间的连接线上方,右键单击删除连线按钮,可将子节点之间的依赖关系解除。
    Image

循环终止条件

画板中子节点任务添加完成后,您可“设置循环终止条件”中的循环代码逻辑。
循环任务中,需配置“循环终止条件”用来控制循环节点的循环次数,其本质也是一串 Shell 代码,会一起被调度执行,代码执行需保证最后一行输出的是 True 或 False 两种类型之一,来表示是否继续下一次循环,True 表示继续循环,False 表示终止循环。
终止条件支持 Shell 两种语言进行循环条件判断的代码开发,且为了方便您对整体循环次数的控制,平台为您提供了内置变量,即:loopCount、loopIndex、outputsLoopSize、outputsLoopCurrent,可在编辑循环代码逻辑时使用。
内置变量说明:

内置变量

含义

示例

{{loopCount}}

当前循环次数

第一次循环为 1,第二次循环为 2。

{{loopIndex}}

距离第一次循环的偏移

第一次循环为 0,第二次循环为 1。

{{outputsLoopSize}}

上游任务实例输出按逗号分隔后的元数个数

上游任务输出元素个数为 3 个。

{{outputsLoopCurrent}}

实例输出按逗号分隔后,当前循环执行时所用的元素

此次循环执行时所用的具体元素值,如上游输出值为 1,2,3,则首次循环执行时所用的元素值为 1。

  • Shell 语言循环终止条件示例:

    说明

    Shell 返回 True 表示继续循环,False 表示终止循环。

    if [ {{loopCount}} gt 4 ]
    then
        echo false
    else
        echo true
    fi
    

调度设置

任务配置完成后,您可单击右侧侧边栏的调度设置,配置调度设置参数。调度设置说明详见“任务调度依赖”。

提交任务

数据开发界面调试运行、调度设置等操作完成后,您可单击操作栏中的保存提交上线按钮,在弹窗中,进行回溯数据、监控设置、提交设置等操作,并单击确认按钮,完成作业提交。操作详见“4 提交上线”。

使用案例

下文将为您演示 Shell 任务与循环任务联合使用的案例:

  1. 上游 Shell 任务输几行数据,并将输出的结果数据,传递给下游循环任务作为输入参数使用。
  2. 循环任务与 Shell 任务配置成上下游依赖关系,并获取 Shell 任务的输出结果,将其通过 Shell 语言,转换成可判断的循环次数。
  3. 循环节点内部的子节点,获取 Shell 任务输出的每一行数据,并按照循环次数,依次打印对应行数的数据结果,并将每行数据依次插入 ByteHouse 数据表中。

配置 Shell 任务

配置上游 Shell 任务,并输出其查询结果。

  1. 参考“Shell”,新建 Shell 任务。

  2. 在代码编辑区按需输入 Shell 语言:

    echo "qq,ee,rtt"
    
  3. 单击右侧调度设置按钮,在输入输出参数部分,勾选“将运行结果当做输出”,平台自动生成任务中输出的参数名“outputs”。
    Image

  4. 按需设置完其余调度设置参数后,单击编辑器上方保存调试按钮,进行开发环境调试,查看当前语句输出的查询结果。

  5. 单击提交上线按钮,完成 Shell 任务提交发布操作。

配置循环任务

上游 Shell 任务完成提交发布后,您便可继续配置下游循环任务。

  1. 参考“新建任务”,新建循环任务。

  2. 循环任务创建完成后,在循环任务界面,单击右侧调度设置按钮,进行上游依赖和任务输入参数配置:

    1. 任务上游依赖设置处,单击手动添加按钮,通过任务名称搜索已提交发布的 Shell 任务,勾选任务后,单击确认按钮,完成上游依赖添加。

      说明

      若上游任务执行频率和循环任务执行频率不一致时,您可参考“5 依赖配置方式”说明,调整依赖偏移设置。

      Image
    2. 上游依赖添加完成后,您可在任务输入参数窗口,单击手动添加按钮,进行输入参数添加。
    3. 在添加输入参数窗口,依次设置输入参数名称“partition(可随意命名)”、选择参数来源类型为“任务”、来源任务为“上游任务名称:shell112”、及上游输出的参数名称“outputs”、并随意设置解析替换值参数。
    4. 单击确定按钮,完成输入参数添加。
      Image
  3. 任务依赖和输入参数设置完成后,您便可参考下方示例代码,设置循环终止条件,以 Shell 语言为例设置终止条件:

    echo {{partition}}
    length=$(echo {{partition}} | awk -F, '{print NF}')
    if [ {{loopCount}} -ge "$length" ]; then
        echo "False"
    else
        echo "True"
    fi
    

    终止条件参数说明:

    参数

    说明

    partition

    任务输入参数中设置的参数名称信息,以 {{partition}} 形式,来解析上游任务实际产出的结果。

    loopCount

    平台系统参数,获取当前循环次数。

    说明

    上述终止条件代码中,当循环次数 {{loopCount}} 大于等于 输出结果行数时,循环终止。

  4. 循环终止条件设置完成后,您便可根据实际业务需要,在循环任务画板界面,拖入您需要使用的子节点任务。这里依次拖入两个任务,并命名为“shell_xx”和“bhsqlxxx”。

  5. 并通过拉线依赖的方式,设置循环任务内部子节点“shell_xx”和“bhsqlxxx”的上下游依赖关系。
    Image

  6. 双击子节点图标,分别进入“shell_xx”和“bhsqlxxx”任务内部,进行内部逻辑代码与输入输出参数设置:

    • shell_xx
      1. 按照外部”shell112”任务输出结果,依次循环获取参数,代码示例如下:

        echo {{partition}} | cut -d ',' -f {{loopCount}}
        
      2. 在子节点调度设置中勾选“将运行结果当做输出”,设置内部子节点“shell_xx”的输出参数。
        Image

    • bhsqlxxx
      1. 将上游输出结果,循环写入 ByteHouse 表中:

        INSERT INTO test_bh_db.bh_table VALUES (77, '{{shell_output}}', 5);
        
      2. 调度设置中设置任务输入参数,输入参数命名为“shell_output”,参数来源于上游内部的“shellxx”子节点。
        Image

  7. 依次保存内部子节点与循环任务整体节点,并单击提交上线按钮,将循环任务整体发布至运维中心。

运维中心验证循环结果

上游 Shell 任务与下游循环任务整体发布至运维中心后,您可通过数据回溯操作,来验证循环执行结果:

  1. 在数据开发界面,上方导航栏处,单击选择运维中心 > 离线任务运维 > 数据回溯 > 发起的,进入发起数据回溯界面。
  2. 单击界面右上角新建数据回溯按钮,并完成数据回溯相关设置,等待回溯任务执行完成。回溯操作详见“数据回溯”。
  3. 前往实例运维界面,根据循环任务名称进行搜索任务回溯实例,在实例列表操作列中,单击实例 DAG,进入查看循环任务实例情况,您可在循环任务整体运行日志窗口,查看实例循环次数。
    Image
  4. 您也可在实例 DAG 图中,单击循环任务的 DAG,进入内部子节点查看其执行明细与每次循环时日志打印的结果。
    Image
最近更新时间:2026.01.16 11:57:20
这个页面对您有帮助吗?
有用
有用
无用
无用