循环任务即 do-while 任务,是 DataLeap 数据开发平台提供的一种逻辑控制节点类型,循环节点内部可根据业务需要自定义添加多种任务类型来满足业务场景,如:离线数据集成通道任务、Shell 任务、Python 任务等。创建循环任务后,您可以重新编排循环节点内部的业务流程,控制进入循环的开发节点类型,并设置循环终止条件来控制是否退出循环。
本文将为您介绍循环任务节点的配置说明。
注意
任务名称信息仅允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在 127 个字符以内。
循环任务内部支持单一子节点和复杂的组合子节点,并可通过拖拉拽的方式,自定义多个子节点之间的依赖关系。根据业务需要,在循环任务内部形成一套完整可执行的工作流。
循环任务整体的循环次数,可由下方“循环终止条件”的代码逻辑来决定整体循环执行几次。
通常循环任务会与上游任务有输出且支持将输出结果传递给下游使用的任务类型联合使用,如:Python、Shell 任务,在上游任务的调度设置中,勾选“将运行结果当做输出”选项,进行输出结果的传递。任务输入输出操作详见“输入输出参数设置”。
创建子任务节点
说明
以创建 ByteHouse SQL 任务类型为例,您需先前往管理控制台绑定相应的引擎实例,操作详见“创建项目”。
您可根据实际业务场景,选择不同的任务类型进行编排组合。目前工作流任务支持添加以下几种子任务类型:
引擎类型 | 说明 |
|---|---|
EMR | 基于开源 Hadoop 生态的企业级大数据分析系统 EMR 任务,完全兼容开源,支持 EMR MapReduce、EMR Spark、 EMR HSQL、EMR StarRocks、EMR Doris、EMR 报表任务等任务类型。 |
EMR Serverless StarRocks | 开源 StarRocks 在火山引擎 EMR 上的全托管服务,您可以灵活的创建和管理 StarRocks 实例以及数据,支持 EMR Serverless StarRocks SQL。 |
EMR Serverless Spark | 基于 Serverless 的 Spark 服务,完全兼容开源 Spark 引擎能力,并通过 LAS Foramtion 提供统一元数据服务和数据权限服务。在 DataLeap 中支持 Spark SQL 和 Spark Jar 任务类型,满足中大型客户对不同状态资源的差异化诉求。 |
流式计算 Flink 版 | 火山引擎上提供的 100% 兼容 Apache Flink 的企业级全托管流式计算平台,支持开箱即用,免运维,Serverless 极致弹性,极简 SQL 开发,流批一体等特性。 |
LAS | Serverless 湖仓一体分析服务,提供多模引擎,完全兼容开源 Spark、Presto、Hudi 生态,支持 LAS SQL、LAS Spark、LAS 报表任务类型。 |
ByteHouse CE | 基于开源 ClickHouse 进行深度优化和改造的引擎类型,提供海量数据上更强的查询服务和数据写入性能,支持 ByteHouse CE SQL 任务类型。 |
通用 | 通用引擎子任务类型,通过 Shell/Python 语言来实现业务逻辑开发。 |
数据集成 | 支持多种异构数据源之间进行高速稳定的数据同步离线任务类型。 |
添加子任务依赖
子任务创建完成后,您可通过子任务中心的点,用拉线的方式来形成子任务间的上下游依赖关系。依赖关系添加后,您可单击画板右上方的自动排版按钮,形成按顺序依赖的 DAG 关系图。
管理子任务
在画板中,您也可对已创建的子任务进行以下操作:
功能 | 说明 |
|---|---|
编辑子节点 | 鼠标移动至子节点图标上方,右键单击编辑子节点按钮或直接双击子节点图标,进入具体的子节点内部,进行业务逻辑代码的开发。 说明 子节点内部的调度设置中,没有任务依赖相关的配置,即内部子节点不能单独被外部任务依赖或依赖外部任务。仅循环任务整体能与外部任务形成依赖关系。 |
删除子节点 | 鼠标移动至子节点图标上方,右键单击删除子节点按钮,二次确认后,可将创建的子任务从画板中删除。 说明 删除操作会从工作流中直接删除,且不会进入回收站,需谨慎操作删除子节点。 |
重命名子节点 | 鼠标移动至子节点图标上方,右键单击重命名子节点按钮,对当前子节点进行重命名操作。只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,127 个字符以内。 |
删除连线 | 鼠标移动至两个子节点之间的连接线上方,右键单击删除连线按钮,可将子节点之间的依赖关系解除。 |
画板中子节点任务添加完成后,您可“设置循环终止条件”中的循环代码逻辑。
循环任务中,需配置“循环终止条件”用来控制循环节点的循环次数,其本质也是一串 Shell 代码,会一起被调度执行,代码执行需保证最后一行输出的是 True 或 False 两种类型之一,来表示是否继续下一次循环,True 表示继续循环,False 表示终止循环。
终止条件支持 Shell 两种语言进行循环条件判断的代码开发,且为了方便您对整体循环次数的控制,平台为您提供了内置变量,即:loopCount、loopIndex、outputsLoopSize、outputsLoopCurrent,可在编辑循环代码逻辑时使用。
内置变量说明:
内置变量 | 含义 | 示例 |
|---|---|---|
{{loopCount}} | 当前循环次数 | 第一次循环为 1,第二次循环为 2。 |
{{loopIndex}} | 距离第一次循环的偏移 | 第一次循环为 0,第二次循环为 1。 |
{{outputsLoopSize}} | 上游任务实例输出按逗号分隔后的元数个数 | 上游任务输出元素个数为 3 个。 |
{{outputsLoopCurrent}} | 实例输出按逗号分隔后,当前循环执行时所用的元素 | 此次循环执行时所用的具体元素值,如上游输出值为 1,2,3,则首次循环执行时所用的元素值为 1。 |
说明
Shell 返回 True 表示继续循环,False 表示终止循环。
if [ {{loopCount}} gt 4 ] then echo false else echo true fi
任务配置完成后,您可单击右侧侧边栏的调度设置,配置调度设置参数。调度设置说明详见“任务调度依赖”。
数据开发界面调试运行、调度设置等操作完成后,您可单击操作栏中的保存和提交上线按钮,在弹窗中,进行回溯数据、监控设置、提交设置等操作,并单击确认按钮,完成作业提交。操作详见“4 提交上线”。
下文将为您演示 Shell 任务与循环任务联合使用的案例:
配置上游 Shell 任务,并输出其查询结果。
参考“Shell”,新建 Shell 任务。
在代码编辑区按需输入 Shell 语言:
echo "qq,ee,rtt"
单击右侧调度设置按钮,在输入输出参数部分,勾选“将运行结果当做输出”,平台自动生成任务中输出的参数名“outputs”。
按需设置完其余调度设置参数后,单击编辑器上方保存和调试按钮,进行开发环境调试,查看当前语句输出的查询结果。
单击提交上线按钮,完成 Shell 任务提交发布操作。
上游 Shell 任务完成提交发布后,您便可继续配置下游循环任务。
参考“新建任务”,新建循环任务。
循环任务创建完成后,在循环任务界面,单击右侧调度设置按钮,进行上游依赖和任务输入参数配置:
说明
若上游任务执行频率和循环任务执行频率不一致时,您可参考“5 依赖配置方式”说明,调整依赖偏移设置。


任务依赖和输入参数设置完成后,您便可参考下方示例代码,设置循环终止条件,以 Shell 语言为例设置终止条件:
echo {{partition}} length=$(echo {{partition}} | awk -F, '{print NF}') if [ {{loopCount}} -ge "$length" ]; then echo "False" else echo "True" fi
终止条件参数说明:
参数 | 说明 |
|---|---|
partition | 任务输入参数中设置的参数名称信息,以 {{partition}} 形式,来解析上游任务实际产出的结果。 |
loopCount | 平台系统参数,获取当前循环次数。 |
说明
上述终止条件代码中,当循环次数 {{loopCount}} 大于等于 输出结果行数时,循环终止。
循环终止条件设置完成后,您便可根据实际业务需要,在循环任务画板界面,拖入您需要使用的子节点任务。这里依次拖入两个任务,并命名为“shell_xx”和“bhsqlxxx”。
并通过拉线依赖的方式,设置循环任务内部子节点“shell_xx”和“bhsqlxxx”的上下游依赖关系。
双击子节点图标,分别进入“shell_xx”和“bhsqlxxx”任务内部,进行内部逻辑代码与输入输出参数设置:
按照外部”shell112”任务输出结果,依次循环获取参数,代码示例如下:
echo {{partition}} | cut -d ',' -f {{loopCount}}
在子节点调度设置中勾选“将运行结果当做输出”,设置内部子节点“shell_xx”的输出参数。
将上游输出结果,循环写入 ByteHouse 表中:
INSERT INTO test_bh_db.bh_table VALUES (77, '{{shell_output}}', 5);
在调度设置中设置任务输入参数,输入参数命名为“shell_output”,参数来源于上游内部的“shellxx”子节点。
依次保存内部子节点与循环任务整体节点,并单击提交上线按钮,将循环任务整体发布至运维中心。
上游 Shell 任务与下游循环任务整体发布至运维中心后,您可通过数据回溯操作,来验证循环执行结果:
