You need to enable JavaScript to run this app.
导航

Serverless Pyjstorm Flink

最近更新时间2023.12.11 10:55:19

首次发布时间2022.08.09 14:26:59

1 概述

DataLeap接入了流式计算 Flink 版,在关联 Flink 的项目和资源池后,可以进行 Flink 作业开发。可以通过 Serverless Pyjstorm Flink 作业实现任务的托管和运维。本文以一个简单的示例,将为您介绍 Serverless Pyjstorm Flink 作业相关的开发流程操作。

2 使用前提

  1. DataLeap产品需开通 DataOps敏捷研发、大数据分析、数据开发特惠版分布式数据自治服务后,才可绑定流式计算 Flink 引擎。绑定引擎操作详见:项目管理

  2. 子账号操作项目绑定 Flink 引擎实例时:

    1. 主账号需要先在流式计算 Flink 版控制台导入 IAM 用户。操作详见:用户管理

    2. 并将子账号加入到对应引擎项目中。操作详见:引擎项目成员管理

  3. 目前 Flink 版本只支持 1.11。

3 任务配置说明

3.1 新建任务

  1. 登录 DataLeap租户控制台

  2. 概览界面,显示加入的项目中,单击数据开发进入对应项目。

  3. 任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。

  4. 选择任务类型:

    1. 分类:数据开发。

    2. 绑定引擎:流式计算 Flink 版

    3. 关联引擎项目:默认选择引擎绑定时选择的引擎项目,不可更改。

    4. 选择任务:流式数据 Serverless Pyjstorm Flink

  5. 填写任务基本信息:

    1. 任务名称:输入任务的名称,只能由数字、字母、下划线、-和.组成, 首尾只能是数字、字母,且允许输入 1~63 个字符。

    2. 保存至: 选择任务存放的目标文件夹目录。

  6. 单击确认按钮,成功创建任务。
    alt

3.2 引用资源

  • 选择 SCM 资源类型,可通过以下方式选择资源:

    • 资源库中选取已有资源。

    • 直接在下拉框中新建资源,或在左侧导航栏资源库中新建流式计算 Flink 资源,详见:资源库操作

    注意

    SCM 资源类型,需先创建好 SCM 仓库,并获取其对应代码版本号和部署路径。

  • 执行引擎:选择执行引擎 Flink-1.11。

3.3 Yaml配置

YamlConf配置案例:
详细参数说明参考域内 Flink 文档:PyJstorm 参数配置及含义

environment:
  SEC_KV_AUTH: '1'
  PYTHONPATH: /opt/tiger/pyFlink:/opt/tiger:/opt/tiger/pyutil:/opt/tiger/ss_thrift_gen:/opt/tiger/ss_lib
topology_standalone_mode: 0
flink_args:
  containerized.heap-cutoff-ratio: 0.3
  blacklist.enabled: true
  smart-resources.enable: false
  smart-resources.durtion.minutes: 1440
  smart-resources.cpu-reserve-ratio: 0.2
  smart-resources.mem-reserve-ratio: 0.2
  smart-resources.disable-mem-adjust: true
  containerized.master.env.SEC_KV_AUTH: 1
  containerized.taskmanager.env.SEC_KV_AUTH: 1
  docker.image: default
  docker.namespace: yarn
  yarn.provided.lib.dirs.enabled: true
  ipv6.enabled: true
  yarn.res-lake.enabled: true
  gts.migration.platform: GTS
spout:
  common_args:
    consumer_group: pyFlink_hello_word_lq
    output_fields:
      - msg
    script_name: /opt/tiger/pyFlink/pytopology/runner/kafka_spout.py
    is_thread_num_determined_by_partition: true
    whence: 1
    read_num: 100
  spout_list:
    - spout_name: simple_spout
      args:
        kafka_cluster: bmq_test_lq
        bootstrap_servers: 'kafka-cnngiu8m5tg2otd0.kafka.ivolces.com:9092'
        auto_partition: true
        topic_name: pyjstorm-test-1
        partition: 3
        prefer_service: dcleader
        tolerate_exception_num: 3
flink_resource_args:
  tm_slot: 12
  tm_cores: 0.5
  jm_cores: 1.2
  tm_memory: 4096
  jm_memory: 4096
  tm_num: 2
bolt:
  bolt_list:
    - thread_num: 1
      bolt_name: split_bolt
      script_name: /opt/tiger/pyFlink/pytopology/runner/bolt.py
      output_fields:
        - word
      group_list:
        - group_type: shuffle
          group_field: null
          stream_from:
            - simple_spout
      args:
        debug_dumper.enable: true
        debug_dumper.tcc_service_name: stream.pyFlink.dump
        debug_dumper.tcc_config_key: wordcount_py.split_bolt
        debug_dumper.tcc_check_interval_sec: 30
        debug_dumper.max_queue_len: 1000
    - thread_num: 1
      bolt_name: wordcount_bolt
      script_name: /opt/tiger/pyFlink/pytopology/runner/bolt.py
      output_fields: null
      group_list:
        - group_type: fields
          group_field: word
          stream_from:
            - split_bolt
      args:
        debug_dumper.enable: true
        bootstrap_servers: 'kafka-cnngiu8m5tg2otd0.kafka.ivolces.com:9092'
        kafka_cluster: bmq_test_lq
        topic_name: pyjstorm-test-2
topology_name: pyjstorm_11_test
topology_standalone_time: 500
common_args:
  metrics_namespace_prefix: pyflink.wordcount_py
  log_file: /var/log/tiger/wordcount_py.log
  log_format: '%(asctime)s %(filename)s %(levelname)s %(message)s'

3.4 参数设置

单击右侧导航栏中参数设置,进行任务的基本信息、资源设置、数据源登记配置。

3.4.1 基本信息

Serverless Pyjstorm Flink 任务的基本信息配置如下:

参数名称描述
任务名称显示创建任务时输入的任务名称,参数设置中不支持修改,可以在左侧任务目录结构中的任务名称右侧更多单击重命名进行修改。
任务类型Serverless Pyjstorm Flink
引擎类型流式计算 Flink 版
关联引擎项目DataLeap侧关联的引擎项目名称。
任务描述非必填,可对任务进行详细描述,方便后续查看和管理。

责任人

仅限一个成员,默认为任务创建人(任务执行失败、复查通过或者失败时的默认接收者),可根据实际需要,修改为其他项目成员。

  • 责任人需为项目中有编辑权限的成员,项目权限须由项目管理员授权。

  • DataLeap 通过项目空间实现开发协同,项目中具有编辑权限的角色,可对项目下的所有任务进行编辑,因此无须通过为任务设置多个责任人,来达到协同合作的效果。

计算资源从关联的引擎项目中选择目标资源池。

优先级

您可对流式任务设置任务优先级,指定当前任务的优先级情况:

  • 等级数字越小,代表优先级等级越高。

  • 其中 D3~D5 等级,您可直接在调度设置页面中设置。

标签

您可以自定义标签,用于标识某一类任务,以便快速搜索过滤,操作即时生效,无需重新上线任务。

  • 下拉选择项目归属的标签组,及对应的标签信息,支持添加多个标签组。

  • 若没有可选的标签组,您可进行以下操作步骤:
    a. 进入 DataLeap 智能市场 > 任务标签管理,进入任务标签管理控制台,新建标签组。详见任务标签管理

    b. 新建标签组成功后,前往项目管理 > 具体项目的配置信息 > 标签管理来添加标签组。详见标签管理

3.4.2 资源设置

设置任务运行时相关资源分配情况

参数名称描述
TaskManager个数设置 flink 作业中 TaskManager 的数量。
单TaskManagerCPU数设置单个 TaskManager 所占用的CPU数量。
单TaskManager内存大小(MB)设置单个 TaskManager 所占用的内存大小。
单TaskManager slot数设置单个 TaskManager 中slot的数量。
JobManager CPU数设置单个 JobManager 所占用的CPU数量。
JobManager内存设置单个 JobManager 所占用的内存大小。

3.4.3 数据源登记

登记该任务使用的 Source 和 Sink 信息,以用于后续监控配置和血缘构建,支持通过手动添加

  • 手动添加:单击手动添加按钮,可根据 SQL 逻辑进行数据源上下游登记。

    • 源/目标:支持源/目标 Source 和 Sink 的选择。

    • 数据源类型:支持选择 DataGen、Rmq、Bmq、JDBC、Abase、火山/自建 Kafka 这些 Connector 的数据源。

    • 各数据源类型需补充填写相应的信息,您可根据实际场景进行配置,如 Kafka 的数据源类型,需填写具体的Bootstrap Servers、Topic 名称、消费组等信息。

    • 直接上游任务:您可通过手动输入任务名称方式,进行搜索上游任务,单击添加按钮,进行手动添加。

    • 直接下游任务:无法通过手动添加的方式进行操作,仅展现通过数据源匹配后的结果。

    说明

    1. Abase 和 Rmq 数据源类型,仅支持解析操作,暂不支持上线发布后运行。且 Abase 数据源通常是在火山引擎内部业务上云中使用。

    2. Serverless Pyjstorm Flink 暂不支持自动解析功能来登记数据源。

  • 批量删除:手动添加的数据源,您可通过勾选已登记的数据源后,单击右侧删除批量删除按钮 ,删除勾选中的数据源。

  • 上下游任务查看:数据源登记解析出的上下游任务,您可对上游依赖任务进行编辑、添加、删除、禁用等操作;下游任务仅支持查看。

4 提交发布

任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。
单击操作栏中的保存提交上线按钮,在弹窗中,需先通过任务上线检查提交上线等上线流程,最后单击确认按钮,完成作业提交。详见概述---流式任务提交发布
后续任务运维操作详见:实时任务运维