You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
离线数据集成
通用设置
复制全文
通用设置

本文将为您介绍 DataLeap 离线集成任务创建和管理的通用操作。

新建任务

  1. 登录DataLeap控制台。
  2. 在具体项目中进入数据开发 > 任务开发界面,并单击新建任务按钮,进入新建任务页面。
  3. 在新建任务界面,依次选择集成任务 > 绑定引擎 > 关联实例 > 离线集成任务类型。其中:
    • 绑定引擎:默认展示当前项目下已绑定的CDH、Hadoop、或LAS引擎;
    • 关联实例:选择引擎下对应的引擎实例信息。
  4. 设置任务基本信息及保存路径信息,单击确定按钮,完成创建。

    注意

    输入的任务名称信息,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,127个字符以内。

Image

字段映射配置

数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
字段映射支持选择基础模式转换模式配置映射。

说明

基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。

转换模式

字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先对其进行各种数据转换操作后,以指定格式输入到目标端数据库中,满足不同业务场景需求。
转换模式可应用于各种轻量级数据处理场景,如:

  • 数据清洗:可以使用转换模式过滤冗余数据、处理缺失值、纠正数据错误等场景。
  • 数据预处理:可以使用转换模式对数据进行标准化、归一化、离散化等预处理,以便更好地分析和处理数据。
  • 数据转换:可以使用转换模式将数据从一种格式转换为另一种格式。

Image
按需配置数据来源和目标端数据信息后,单击“转换模式”按钮,进入转换模式配置界面,在转换模式中,您需依次配置:来源节点、数据转换、目标节点信息。

  1. 配置来源节点
    默认情况下,来源节点和目标节点会自动添加到节点目录列表中,如下图所示。
    Image
    1. 单击来源节点下的 Source 节点,右侧进行数据来源的字段信息配置。
    2. 配置来源节点名称信息,您可自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    3. 配置数据字段信息,您可通过自动添加、手动添加两种模式来添加数据来源字段信息。
    4. 来源节点信息配置完成后,单击确认按钮,此时节点目录列表中的来源节点名称左侧灰点,会变成绿点,则表示来源节点配置完成。
  2. 添加转换节点
    数据转换节点支持添加 SQL 转换节点,支持通过 Flink SQL 函数来实现多种转换操作,例如删除列、重命名字段、数据类型转换等。添加 SQL 转换节点操作如下:
    Image
    1. 单击数据转换节点右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则。

    2. 配置转换节点名称,您可自定义输入节点转换名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。

    3. 输入 SQL 脚本转换规则,目前仅支持添加一个转换的 SQL 语句,且不能包括 “;”。

      注意

      输入的转换 SQL 语句仅支持 Flink 内部的 Flink SQL 语句,不支持源端或目标端数据源类型的 SQL 语法。

      以下是 Flink SQL 相关的转换示例(假设源表名为 Source):

      -- 字段重排序: 源表有 a, b, c 三个字段,但下游目的表只有 c, b 两个字段
      SELECT c, b FROM Source
       
      -- 字段合并: 把源表的 first_name 字段和 last_name 字段用空格连接起来,并作为一个新的字段 full_name
      SELECT first_name || ' ' || last_name AS full_name FROM Source
       
      -- 字段拆分: 把源表的 full_name 字段根据空格做拆分,并产生 first_name 和 last_name 两个新字段
      SELECT SPLIT_INDEX(full_name, ' ', 1) AS first_name, SPLIT_INDEX(full_name, ' ', 2) AS last_name FROM Source
       
      -- 判断 name 字段是否仅包含空白字符, 如果是的话就输出 NULL, 否则输出原字符串
      SELECT
          CASE
               WHEN IS_SPACES(name) THEN NULL
               ELSE name
          END
      FROM Source
      

      更多 SQL 节点支持的转换函数列表,详见Flink 1.11 系统函数Flink 1.16 系统函数
      以下为内部特有的一些函数说明:

      函数名

      用途

      返回值类型

      示例

      IS_NUMBER

      判断一个字符串是否是数字格式。

      Boolean

      IS_NUMBER(price),如果 price 是 '1'、'2.3' 等数字格式的字符串,则返回 true

      IS_SPACES

      判断一个字符串是否全由空格字符组成。

      Boolean

      IS_SPACES(name),如果 name 是 ' ' 等空格字符组成的字符串,则返回 true

      GET_JSON_OBJECT

      按照给定的 JSONPath 路径,从给定 JSON 字符串中提取相应的字段值。 如果获取不到给定的 JSONPath 路径,则返回 null 或者第三个参数指定的默认值(可选)。

      Object

      GET_JSON_OBJECT(json_field, '$.b', 'None') :

      • 如果 json_field 是 '{"a":1,"b":"hello"}" 则返回 'hello'
      • 如果 json_field 没有 b 字段,则返回给定的默认值 'None'
    4. 转换信息配置完成后,单击确认按钮,在通过检查后,节点目录列表中的转换节点名称左侧灰点,也会变成绿点,表示转换节点配置完成;若未通过检查,则会有报错提示,需根据信息修改参数。
      Image

  3. 配置目标节点
    与来源节点类似,配置目标节点Sink信息:
    1. 单击目标节点下的 Sink 节点,右侧进行数据目标的字段信息配置。
    2. 配置目标节点名称信息,您可自定义输入目标节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    3. 配置目标数据字段信息,您可通过自动添加、手动添加两种模式来添加数据来源字段信息。

      说明

      目标节点的字段名称应当与上游 SQL 节点的输出字段保持一致,避免运行时异常。
      例如上游是 SQL 转换节点,SQL 语句为 SELECT name || '-SUFFIX' AS new_name, address FROM Source,那么目标节点的字段需要为 new_name、address。

    4. 目标节点信息配置完成后,单击确认按钮,确认节点名称左侧变成绿点后,表示目标节点配置完成。
      Image
  4. 执行转换调试
    数据转换节点配置完成后,当所有节点左侧都变成绿点,则表明可以将任务提交执行了。对于离线集成作业,可以单击上方操作栏中的调试按钮开始执行。
    Image

说明

  • 若后续对数据转换节点做任何变更(包括重命名、修改参数、删除节点等操作),均会导致当前节点和下游所有节点的状态,由绿色变成灰色。此时,您需要依次单击每个节点中的确认按钮,确认所有节点重新变成绿色后,方可继续提交执行任务。否则将会有如下报错信息提示:
    Image
  • 对于 SQL 节点,我们建议只使用“Flink 1.11 中的标量函数”来完成轻量级数据转换,请避免使用 JOIN、GROUP BY、DISTINCT、WINDOW 等带状态的语句,以免造成作业运行缓慢甚至崩溃。
  • SQL 节点当前仅能添加一个。

基础模式

基础模式您可通过以下几种方式操作字段映射关系:

  • 单击自动添加,可以根据数据源 Schema 自动添加字段。部分数据源类型支持获取schema自动添加字段。
  • 单击手动添加,可以添加一条空白的字段信息,手工输入字段名、类型等。

    说明

    来源端字段信息支持输入数据库函数和常量配置,单击手动添加按钮,在源表字段中输入需添加的值,并选择函数或常量类型,例如:

    • 函数:支持您输入 now()、current_timestamp()、unix_timestamp() 等数据库支持的函数。
    • 常量:您可自定义输入常量值,'123'、'${DATE}'、'${hour}'、'${runtime}' 等,输入值两侧需要加上英文单引号,支持结合调度参数说明使用。
  • 单击同名映射,可根据目标端/源端的字段添加情况,进行同名映射添加源端/目标端的字段信息。

    说明

    同名映射能力,仅部分数据源支持,如 FTP/SFTP、Elasticsearch数据源。

  • 单击删除全部,可以删除全部字段信息。
  • 通过拖拽字段左侧的按钮,可以调整字段顺序。

高级参数设置

配置任务数据源信息时,各类型任务的高级参数设置项基本相同,如下图所示。
Image
各任务数据处理高级参数设置说明如下表所示。

参数

说明

开启高级参数设置

可选择是否要进行高级参数设置。开启设置后,会显示相应的设置参数。

编辑模式

支持单行编辑模式和脚本编辑模式,脚本编辑支持JSON、Yaml格式填写参数。 当选择单行编辑模式时,需要设置相应的编辑参数。

并发设置

并发数限制

任务内数据同步的并发数限制数量,有系统默认值、最大并发数、自定义并发数三个选项可设置。

  • 系统默认值:指执行引擎会根据任务的类型、文件的个数、文件的大小、连接节点的个数智能推断任务执行的并发。如果不确定如何设置并发,可使用默认值。
  • 最大并发数:指任务在各个执行阶段控制的最大并发数,并发数是由系统自动计算,当计算的并发大于最大并发数时,会将并发限制到最大并发数的值。
  • 当选择最大并发数时,需设置最大并发数,可自定义1~20000的任意一个整数。 当选择自定义并发数时,需设置读入并发数写入并发数

数据清洗设置

数据清洗

可选择是否开启数据清洗。

  • 当选择关闭时,需要设置可容忍最大脏数据条数可容忍最大脏数据比例,超过最大可容忍范围,任务会失败。
  • 当选择开启时,任务执行时出现的每一行脏数据均会被过滤,不会影响任务执行失败。执行日志中最多会显示 50 条脏数据具体信息。

流量测试设置

小流量测试

可选择是否开启小流量测试。当初次提交作业想验证数据正确性时,可开启小流量测试。 当选择开启时,需要设置测试条数。测试条数在1~10000之间,当每个并发运行到该条数时,任务会自行关闭。

单并发流控设置

流控清洗

流量控制支持用户自定义每一个并发的读写字节流控阈值,读写记录数流控阈值和读写流控等待阈值,默认关闭。读写字节和读写记录有一项满足要求即启动流控。 当选择开启时,需要设置以下参数:

  • 读写流控等待阈值:该阈值决定每次流控检查的时间间隔,当最新记录到来的时刻和上次记录的时刻超过等待阈值时,进行流控检查。读写流控等待阈值默认为1000ms。 如果发现流控效果不明显,可以调小此阈值(如10ms)来进行更精细控制。
  • 读写字节流控阈值:该阈值决定每次检查时是否激活流量控制。当最新的字节数和上次记录的字节数超过阈值时,启动流控。根据超过的比例进行动态睡眠处理。默认为-1,即不开启。
  • 读写记录数流控阈值:该阈值决定每次检查时是否激活流量控制。当最新写的记录数和上次记录的记录数超过阈值时,启动流控。根据超过的比例进行动态睡眠处理。默认为-1,即不开启。

根据测试结果,您可根据任务所需的qps快速配置读写流控参数,公式如下:
读写记录数流控阈值(每秒)~= 单并发qps * 2

注意

如果需要进行多并发的精确流控,需要在任务自定义参数中加入job.common.slots_per_tm : 1,将每个Worker中的线程数设置为1。

自定义参数设置

参数设置

自定义参数支持用户更好地定制自己任务的运行方式。支持单行编辑模式和脚本编辑模式。

  • 单行编辑:设置字段包括key值和value,其中key值仅可由字母、数字、小数点、下划线或连字符组成。
  • 脚本编辑:支持JSON、Yaml格式填写参数。

示例如下:

  • job.common.flink_tm_slot_memory:Flink TM单个slot的内存,单位为MB,默认大小为4096。配置示例:job.common.flink_tm_slot_memory=8192。
  • job.reader.filter:数据过滤参数,过滤MySQL的数据时使用,实现增量同步,参数值参考相应MySQL语法,填写where之后的过滤语句。配置示例:job.reader.filter:date_time='${DATE}'
    Image

更多高级参数参考,详见高级参数

调度设置

离线数据集成任务的调度设置信息与离线数据开发任务的基本相同,相关描述可参见“调度设置”。

历史版本

任务每次上线都会生成一个版本,可以对各版本进行查看、对比、回滚等操作。

  1. 单击右侧侧边栏的历史版本,进入历史版本页面。
  2. 可以执行以下操作:
    • 单击列表中某个版本操作列的查看按钮,可以查看该版本的任务详情信息。
    • 勾选任意两个版本,单击版本对比按钮,可以对比查看两个版本的差异。
    • 单击列表中某个版本操作列的恢复至草稿按钮,可以将该版本恢复至草稿状态,需要重新单击上线。
最近更新时间:2025.09.29 16:10:30
这个页面对您有帮助吗?
有用
有用
无用
无用