You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
作业开发
开发 Flink Python 任务
复制全文
开发 Flink Python 任务

Flink 支持开发 Python 类型任务。您可以自行编写 Python 程序,并将 Python 文件上传到资源库,即可在平台上开发 Python 任务。
本文为您介绍流式类型 Python 任务的开发流程。如需了解 Batch Python 任务,请参见 开发 Flink Batch Python 任务

前提条件

  • 项目管理员(Project_Admin)已经在项目内创建好 Flink 资源池,请参见创建资源池
  • 开发人员需提前编写 Python 程序,并将 Python 文件上传到资源库。如何上传文件,请参见上传资源文件。可以参考如下 Python 代码内容:

注意:以下代码文件名不能为 pyflink 等和 flink python 库冲突的名字,否则会造成任务失败等原因

from pyflink.datastream import StreamExecutionEnvironment

def main():
    # 创建执行环境
    env = StreamExecutionEnvironment.get_execution_environment()
    # 创建一个数据流,元素为 1, 2, 3, 4, 5
    data_stream = env.from_collection([1, 2, 3, 4, 5])
    # 对数据流中的每个元素进行加 1 操作
    result_stream = data_stream.map(lambda x: x + 1)
    # 打印结果
    result_stream.print()
    # 执行程序
    env.execute("Simple PyFlink Example")

if __name__ == "__main__":
    main()

功能限制

目前仅 Flink 1.17-volcano 版本支持 Flink Python 任务。

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域。
  3. 在左侧导航栏选择项目管理,在搜索框中根据项目名称进行模糊搜索,然后单击项目区块进入项目。
  4. 在项目左侧导航栏选择作业开发 > 作业开发
  5. 作业开发页面单击加号按钮,创建任务。
    您也可以选择目标文件夹,直接在该文件夹中创建任务;也可以直接单击引导页面下的 Flink Python 作业
  6. 创建作业对话框,设置作业名称、存储位置、引擎版本等关键参数,然后单击确定

Image

配置

说明

作业名称

自定义设置任务的名称。
名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

作业类型

选择 作业类型 > Flink Python

存储位置

从下拉列表中选择目标文件夹。
系统默认存在一个数据开发文件夹,但为了更方便的管理任务,您可以自由创建文件夹。如何创建任务文件夹,请参见管理任务文件夹

引擎版本

仅 Flink 1.17-volcano 版本支持 Flink Python 任务。

  1. 在任务配置区域,设置任务关键参数。

Image

配置

说明

任务名称

创建作业时设置的名称。

Python File URI

从下拉列表中选择已上传的 Python 文件。如果您还没有上传文件,请参见资源文件管理

Entry Module

程序的入口类。

  • 如果 Python 作业文件为.py文件,则该项不需要填写。
  • 如果 Python 作业文件为.zip文件,则需要在此处输入您的 Entry Module,例如 kafka_test。

Entry Point Main Arguments

业务程序 main 函数的args参数,非必填项。请根据界面提示填写。

Python Libraries

第三方 Python 包。
第三方 Python 包会被添加到 Python worker 进程的 PYTHONPATH 中,从而在 Python 自定义函数中可以直接访问。

  1. 任务开发和配置完成后,单击保存

步骤二:上线任务

开发与生产隔离,当任务开发者完成任务开发后,可以将任务上线到生产环境。

  1. 作业开发栏目下查找并单击目标任务,单击上线
  2. 任务上线设置对话框,选择运行资源池、设置任务优先级调度策略,然后单击确定
    系统会提示任务上线成功,可以前往任务管理页面查看。

Image

配置

说明

运行资源池

从下拉列表中选择任务运行的 Flink 资源池。

任务优先级

系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

调度策略

根据需求配置任务调度策略:

  • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
    该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
  • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
    例如:剩余10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

调度时长

设置为 GANG 调度策略时,需要设置调度时长。调度时长表示再次调度的时间间隔,即任务拉起不成功会再次重试调度。
如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

步骤三:启动任务

任务开发者将任务上线到生产环境后,由运维人员启动任务。

  1. 在项目左侧导航栏选择任务运维 > 任务管理

  2. 任务列表页面,选择额目标任务,单击操作列中的启动

  3. 启动任务对话框,选择任务启动方式,然后单击确定
    Image

    配置

    说明

    启动方式

    请根据实际情况选择任务启动方式:

    • 从最新状态启动:以最新的 Checkpoint 或 Savepoint 启动。
    • 全新启动:不使用 Checkpoint 或 Savepoint,直接启动。
    • 指定快照启动:指定目标快照(Savepoint)启动。

    说明

    首次上线的任务,只能是全新启动方式。

    参数配置

    任务携带在开发侧的并行度、TaskManager 和 JobManager 的资源配置。在启动任务时支持您更新配置并快速生效。

    说明

    更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。

    • 并行度:任务全局并发数。
    • 单个 TaskManager CPU 数:单个 TaskManager 的 CPU 核数。
    • 单个 TaskManager 内存大小:单个 TaskManager 占用的内存大小。
    • 单个 TaskManager slot 数:单个 TaskManager 的 Slot 数量。
    • JobManager CPU 数:单个 JobManager 的 CPU 核数。
    • JobManager 内存大小:单个 JobManager 占用的内存大小。

    更多设置

    在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。
    Image

    注意

    • 仅当选择指定快照启动从最新状态启动方式时,支持勾选忽略部分算子状态。
    • 当您选择全新启动方式时,不支持忽略算子状态。

任务在生产环境上正常运行后,您可以在 Flink UI 界面了解任务运行、TaskManager、JobManager 的详细信息。

  1. 在项目左侧导航栏选择任务运维 > 任务管理
  2. 任务列表页面筛选目标任务,单击操作列下的 Flink UI
    浏览器将会自动打开 Apache Flink Dashboard 页面。
  3. 在 Apache Flink Dashboard 页面,查看任务运行详情。
    Image
最近更新时间:2025.01.14 17:41:38
这个页面对您有帮助吗?
有用
有用
无用
无用