You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
最佳实践
API数据入湖实践
复制全文
API数据入湖实践

1. 实践背景

随着企业数字化转型的提速,数据量急剧增长且来源日益多样化,传统的数据处理方式已难以满足当前业务需求。DataLeap 具备数据服务 API 和数据集成功能,作为一种高效的数据访问手段,它支持企业创建数据服务 API 以读取不同数据库中的数据,并借助 DataLeap 的数据集成能力,将这些数据同步至数据湖中,从而为后续的数据分析和机器学习提供统一的基础。
在本次实践中,我们通过数据集成 Rest_API 到 LAS 数据源通道任务,将 Oracle 数据源 API 中查询到的数据,传输到 LAS 数据源库表下,进行深入分析。同时,Oracle 数据源 API 还将启用动态 Token 鉴权能力,以保障数据安全性和 API 灵活性。

2. 使用前提

  1. DataLeap 数据服务 > 系统管理 > 应用管理中,对应应用的密钥管理已使用 OAuth2.0 密钥类型。操作详见“管理密钥”。
  2. 已创建 Oracle 数据源下的 API,并将 API 授权给已创建的应用。操作详见“API开发”、“API调用”。
  3. 已在数据地图中创建 LAS 的数据库信息。操作详见“管理LAS库”。

3. 关键操作步骤

  1. 4. 新建数据集成数据源
    新建数据集成任务运行所需的 Rest_API 和 LAS 数据源。
  2. 5. 新建工作流
    本实践中为您介绍两种工作流配置方式
    • 在一个工作流节点中,同时配置 Python 任务和数据集成任务,其中 Python 任务来获取动态 Token 信息,下游数据集成任务接收 Token 信息,来读取 API 中的相关数据;
    • 分别配置独立的 Python 任务和数据集成任务,与工作流节点不同,两个任务可单独配置其调度的开始时间,灵活度较高。
  3. 6. 验证 API 数据入湖结果
    新建 LAS SQL 类型的临时查询任务,来验证数据进入 LAS 的最终结果。

4. 新建数据集成数据源

新建数据集成任务运行所需的 Rest_API 和 LAS 数据源,具体操作如下:

  1. 登录DataLeap控制台。

  2. 选择具体项目后进入数据开发 > 数据源管理 > 自定义数据源,进入自定义数据源管理页面。

  3. 单击右上角新建数据源按钮,进入新建数据源页面。
    Image

  4. 依次完成以下数据源注册信息配置:

    • 配置 Rest_API 数据源

      参数

      说明

      数据源类型

      下拉可选 Rest_API 数据源类型。

      接入方式

      目前仅支持公网的接入方式。

      数据源名称

      数据源的名称,可自行设置。
      仅支持中文、英文、数字、“_”,100个字符以内,如 doc_rest_api

      数据源描述

      数据源的描述信息。
      建议填写数据源特性、用途等关键信息,以便后续数据源管理和使用。

      REST_API参数配置

      API Url

      输入源端 API Url 链接地址,可从数据服务 API 详情中获取。
      Image

      默认请求头

      默认的请求头格式,例如:{"Autzzzzzion":"Bexxxxet-toxxxn-1"},没有时可不填写。

      验证方式

      REST_API数据源注册时,目前支持以下三种验证方式,您可根据实际场景选择鉴权模式:

      • No auth:源端API没有开启鉴权方式。
      • Basic auth:API开启了用户名和密码的鉴权方式,需另外填写以下信息:
        • 用户名:输入有数据库权限的账号名称。
        • 密码:输入对应账号的密码信息。
      • Token auth:API开启了Token的鉴权方式,您需要输入具体的 Token信息:
        • Token:输入可以验证通过的Token值信息。

      此处无验证方式时,可选择 No auth。

    • 配置 LAS 数据源

      参数

      说明

      数据源类型

      下拉可选 LAS 数据源类型。

      接入方式

      目前仅支持公网的接入方式。

      数据源名称

      数据源的名称,可自行设置。
      仅支持中文、英文、数字、“_”,100个字符以内,如 Las_Preparation_Source

      数据源描述

      数据源的描述信息。
      建议填写数据源特性、用途等关键信息,以便后续数据源管理和使用。

      LAS 参数配置

      LAS Schema

      输入已创建好的 LAS Schema 库名,如 dataleap_deploy_test

  5. 数据源信息配置完成后,单击连通性测试按钮,验证配置信息是否正确。

  6. 连通成功后,单击确认按钮,完成数据源注册。

5. 新建工作流

本实践中为您介绍两种工作流配置方式。

5.1 方式一:一个工作流

5.1.1 新建工作流

  1. 登录 DataLeap 租户控制台 。
  2. 在概览界面,显示加入的项目中,单击进入数据开发按钮进入对应项目。
  3. 在任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。
  4. 选择任务类型,任务模式选择为工作流任务
  5. 填写任务基本信息:
    1. 任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。如:doc_api_test_workflow
    2. 保存至: 选择任务存放的目标文件夹目录。
      Image
  6. 单击确定按钮,完成任务创建。

5.1.2 配置工作流

工作流任务创建完成后,您可以单击工作流名称,进入工作流配置界面。在工作流界面中,便可通过可视化拖拉拽的方式,将 Python 和 数据集成任务往右侧拖入画布中,进行多引擎任务混合编排设计。
Image

5.1.2.1 配置 Python 子节点
  1. 拖入 Python 任务类型,并输入任务名称为:Python获取Token;

  2. 双击已拖入的 Python 子节点,在新页签中,输入以下 Python 逻辑,来实现数据服务 API 动态 Token 的获取:

    import requests
    import json
    
    url = "http://exxxxxxxx-onpremise.volces.com/data_service/api/v2/api_service/token"
    
    payload = json.dumps({
      "AppKey": "57xxxxxxxxxx6n",
      "AppSecret": "onxxxxxxxxxxGND"
    })
    headers = {
      'user': 'dladmin',
      'Content-Type': 'application/json'
    }
    
    response = requests.request("POST", url, headers=headers, data=payload)
    
    data = response.json()
    
    token_value = data["data"]["Token"]
    print(token_value)
    

    脚本中参数说明:

    参数

    说明

    url

    API 应用使用动态密钥方式时,在 API 详情的接口文档中,可获取其调用的 URL 信息。
    Image

    AppKey

    填写数据服务 > 系统管理 > 应用管理,对应应用的密钥管理中的 AppKey 信息。

    AppSecret

    填写数据服务 > 系统管理 > 应用管理,对应应用的密钥管理中的 AppSecret 信息。
    Image

  3. 脚本设置完成后,在编辑器上方,下拉选择执行引擎为 2.7 版本。
    Image

  4. 执行引擎选择完成后,进入右侧调度设置界面,进行任务输出参数设置,勾选”将运行结果当做输出“选项,便可自动生成输出参数”outputs“。
    Image

  5. 至此,Python 子节点已配置完成,单击编辑器上方的保存按钮,保存任务配置。

5.1.2.2 配置数据集成子节点
  1. 返回工作流节点,并在画板中拖入数据集成子节点,并输入任务名称为 api2las

  2. 添加子任务依赖
    数据集成子任务创建完成后,您可通过子任务中心的点,用拉线的方式来形成子任务间的上下游依赖关系。
    Image

  3. 依赖添加完成后,双击数据集成子节点,进入数据集成任务配置界面,依次进行数据来源和目标端的配置:
    其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

    • REST_API 离线读参数说明

      参数

      说明

      *数据源类型

      您可下拉选择或输入关键词进行搜索REST_API数据源类型。

      *数据源名称

      已在数据源管理中注册成功的REST_API数据源,名称为:doc_rest_api

      *是否拼接header

      按需选择是否在REST_API中拼接header信息,此处无需拼接,可选否。

      *请求Method

      选择请求Method方法,支持指定请求参数的传参位置。
      支持选择GET和POST,两种方式,数据服务请求时,选择 POST 方式。

      请求参数

      按需添加传递给API的请求参数信息,若创建 API 时无设定请求参数,可跳过。

      *请求Body

      请求Method为POST时,传参位置为Body,需添加请求Body信息,此处以JSON方式添加,内容可参考数据服务 API 调用示例,如:

      {
          "ApiID": "18xxxxxx6",
          "Option": [
              {
                  "Id": 100,
                  "Val": 0,
                  "Val_": "{{outputs}}"
              }
          ],
          "Params": {}
      }
      

      说明

      其中:

      • ApiID:需替换为 API 详情中的 API ID 信息。
      • Val_:以上游Python输出的参数,如 {{outputs}} 参数形式进行配置,获取 API 动态 Token 信息。

      *返回数据结构

      根据实际接口返回数据的类型进行选择,此示例中选择数组数据结构。

      *数据格式

      仅支持json的格式。

      *编码格式

      支持设置响应体编码格式:UTF-8、GBK两种格式。

      数据存储json路径

      输入数组数据存储的 json 路径,可为$(表示保存在根路径),或者类似aa.bb.cc的格式。此示例中路径为:$.Data

      *预览数据

      您可通过数据服务 API 测试或使用第三方工具 Postman 事先调用一次 API 后,获得该API返回的预览数据,填写想要获取的数据格式,如:

      {
       "BIGINT1_INFO": 32951388924924,
       "DATE1_INFO": 1705968000,
       "DATETIME1_INFO": 1705939200,
       "DECIMAL1_INFO": -5057921777.23337,
       "DOUBLE1_INFO": 5780152592.99815,
       "FLOAT1_INFO": 5780152800,
       "INT_INFO": 1,
       "PARTITION_KEY": null,
       "STRING1": "软件本站来自xxxxxx",
       "STRING_DATETIME1_INFO": "2024-01-23 11:24:44",
       "STRING_TIMESTAMP1_INFO": "1705980584",
       "TIMESTAMP1_INFO": 1705980584,
       "TINYINT1_INFO": 1
      }
      

      以此来获取源端需要读取数据的字段信息,方便字段映射时进行自动添加。

      *请求次数

      选择 reader 端是单次请求、多次请求或分页请求。
      本实践中以单次请求为例。

    • LAS 离线写说明

      参数

      说明

      *数据源类型

      您可下拉选择或输入关键词进行搜索LAS数据源类型。

      *数据源名称

      已在数据源管理中注册成功的LAS数据源,名称为:Las_Preparation_Source

      *数据表

      选择对应数据源 Schema 下所需要采集的数据表信息,下拉可选。
      在源端字段映射自动添加完成后,您也可在此单击快速建表按钮,快速前往数据地图进行 LAS 表创建。此实践中 LAS 表命名为:doc_restapi_las

      *分区设置

      分区字段从 LAS 表自动获取。
      分区内容可设置具体时间分区粒度,可以用表达式:${date}等参数变量表示。

  4. 数据来源和目标端基础信息配置完成后,在字段映射中,单击自动添加按钮,可自动配置源端和目标端映射的字段信息,一一对应即可。

  5. 字段映射配置完成后,单击右侧调度设置按钮,进行任务输入参数设置。

  6. 单击手动添加按钮,输入参数名称outputs、类型为任务、来源下拉选择为上游 Python 任务名称及对应的输出参数名称。
    Image

  7. 输入参数添加完成后,单击任务保存按钮,保存数据集成子节点任务配置。

5.1.3 调试工作流

  1. 调试运行
    工作流内部的 Python 和数据集成子节点都配置完成后,返回工作流整体画布中,单击工具栏中的保存调试按钮,对工作流整体进行调试操作。
    Image
    在调试运行设置窗口中,选择需调试的业务日期,可根据实际业务情况进行多选或者选定一个业务时间范围,单击提交调试按钮,便可开始工作流任务的调试。
  2. 调试记录
    调试任务开始运行后,可在下方查看调试记录,您也可以查看任务的历史运行记录,包括状态、业务日期、开始时间、结束时间等。
    单击下载日志按钮,可查看每个子节点执行的详细日志情况。
    Image
    下载数据集成子节点执行日志,可在日志中查看读取和写入的数据信息:
    Image

5.1.4 调度设置

子节点任务配置完成后,在整体工作流任务的右侧导航栏中,单击调度配置按钮,进入调度配置窗口,您可以在此设置基本信息、调度属性、依赖等信息,详细参数设置详见“”调度设置

5.1.5 提交工作流

调试结果确认无误后,单击上方操作栏中的保存提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见“提交上线”。

5.1.6 后续步骤

工作流任务提交发布完成后,您便可前往任务运维界面,进行任务的关闭、回溯等运维操作。详见“离线任务运维”。

5.2 方式二:独立的 Python 和 数据集成任务

此方式中,您可不依赖整体的一个工作流,可创建独立的 Python 和 数据集成任务。

5.2.1 新建 Python 任务

  1. 登录 DataLeap 租户控制台 。
  2. 概览界面,显示加入的项目中,单击进入数据开发按钮进入对应项目。
  3. 在任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。
  4. 单击新建任务 > 数据开发 > 通用 > Python 任务类型。
  5. 输入任务基本信息,此处命名为:Python输出动态Token

Image

5.2.2 配置 Python 任务

  1. 新建 Python 任务完成后,在编辑器界面输入以下 Python 逻辑,来实现数据服务 API 动态 Token 的获取。

    说明

    Python 脚本逻辑与参数说明,详见5.1.2.1 配置 Python 子节点

  2. 脚本设置完成后,在编辑器上方,下拉选择执行引擎为 2.7 版本。
    Image
  3. 执行引擎选择完成后,进入右侧调度设置界面,进行 Python 任务的调度属性、依赖关系、任务输出参数等设置。
    其中在任务输出参数中,勾选”将运行结果当做输出“选项,便可自动生成输出参数”outputs“。
    其余更多调度设置说明,详见“调度设置”。
    Image
  4. Python 任务配置完成后,单击编辑器上方的保存提交按钮,保存任务配置并将其发布到运维中心。

5.2.3 新建数据集成任务

  1. 登录 DataLeap 控制台。
  2. 选择数据开发 > 任务开发,进入任务开发页面。
  3. 单击新建任务按钮,进入新建任务页面。
  4. 选择数据集成 > 离线集成任务类型。
  5. 完成任务基本信息设置,此示例中下游数据集成任务命名为:rest_2_api_下游
    Image
  6. 单击确定按钮,完成创建。

5.2.4 配置数据集成任务

数据集成任务新建完成后,进入到任务配置详情界面完成相关配置。

  1. 填写数据集成来源和目标端的数据源信息,并完成字段映射添加。数据源和字段映射配置操作详见5.1.2.2 配置数据集成子节点
  2. 任务基础信息配置完成后,单击右侧侧边栏的调度设置按钮,配置任务调度属性、依赖关系、任务输入参数等。
    1. 依赖关系
      单击手动添加按钮,在弹窗中,输入上游任务名称“python输出动态Token”,进行任务搜索,勾选任务单击确定按钮,完成任务上下游依赖关系配置。
      Image
    2. 任务输入参数
      在调度设置中,单击手动添加按钮,进行输入参数添加。
      输入参数名称outputs、类型为任务、来源下拉选择为上游 Python 任务名称及对应的输出参数名称。
      Image
      更多调度设置说明详见“调度设置”。
  3. 数据集成任务配置完成后,单击编辑器上方的保存提交按钮,保存任务配置并将其发布到运维中心。

5.2.5 运维中心数据回溯

此方式配置完 Python 和数据集成任务后,若需要立即验证数据结果,则您需将任务发布到运维中心,并发起数据回溯,来验证 API 数据入湖结果。或者,您也可以在两个任务正常到达调度实例执行之后,再来验证数据结果。

  1. 在上方导航栏中,选择离线任务运维,进入到运维中心界面。
    Image
  2. 在左侧导航栏中,进入数据回溯 > 发起的界面,单击右上角新建数据回溯按钮,进入回溯配置窗口。
    Image
  3. 完成以下数据回溯配置:
    1. 回溯范围选择单任务及其下游
    2. 输入 Python 任务 ID 或名称:Python输出动态Token
    3. 选择回溯业务时间、指定可调度时段、下游自依赖任务等配置信息;
    4. 勾选回溯任务展现的上下游任务,确认无误后,单击下一步按钮,进入回溯具体回溯实例查看;
      Image
    5. 在回溯实例确认界面,设置任务最大并行及回溯原因,并单击下方确定按钮,发起数据回溯操作。
      Image
  4. 在实例运维中,搜索已发起回溯的实例信息,等待上下游任务回溯执行完成。

6. 验证 API 数据入湖结果

以上两种方式其一执行完成后,您便可通过数据开发 > 临时查询能力,查询落入 LAS 库表中的实际数据。
Image

  1. 在任务开发界面左侧导航栏中,进入临时查询界面。

  2. 单击新建查询按钮,新建 LAS SQL 查询任务。

  3. 在查询编辑界面,选择执行的队列信息和相应的执行引擎(默认为 Spark,可切换为 Auto 来执行)后,编辑如下查询语句:

    SELECT * from dataleap_deploy_test.doc_restapi_las where date=20240909;
    

    Image

最近更新时间:2025.04.02 19:23:15
这个页面对您有帮助吗?
有用
有用
无用
无用