You need to enable JavaScript to run this app.
导航

开发任务

最近更新时间2022.12.15 11:56:38

首次发布时间2022.08.09 18:35:23

项目创建完成后,可以在该项目下创建开发任务,步骤如下:

  1. 点击项目名称,进入该项目的数据开发页面。

  2. 点击左侧的新建任务按钮,进入新建任务页面。

  3. 设置任务信息。

    1. 分类选择数据开发

    2. 绑定引擎选择 EMR

    3. 关联实例下拉列表中,选择自己开通的EMR实例。

    4. 选择任务设置为流式数据 > EMR Flink SQL

    5. 设置任务名称和保存路径。

  4. 点击 确定 按钮,完成创建。

任务创建完成后,直接进入EMR Flink SQL编辑页面,需执行以下编码操作:

  1. EMR Flink SQL 编辑器中,输入以下 Flink SQL 代码后,将 kafka_topic 和 bootstrap_servers 替换为自己准备好的 Kafka 数据源。

    创建一个随机数据源,从随机数据源获取数据并进行加工处理,然后写入到Kafka消息队列,同时打印到运行日志中,代码示例如下所示。

    -- 创建随机数据源
    CREATE  TABLE datagen_source (
                f_sequence   INT,
                f_random     INT,
                f_random_str STRING,
                ts           AS localtimestamp,
                WATERMARK    FOR ts AS ts
            )
            WITH (
                'connector' = 'datagen',
                -- optional options --
                'rows-per-second' = '2',
                'fields.f_sequence.kind' = 'sequence',
                'fields.f_sequence.start' = '1',
                'fields.f_sequence.end' = '10000',
                'fields.f_random.min' = '1',
                'fields.f_random.max' = '10000',
                'fields.f_random_str.length' = '10'
            );
    
    -- 创建Kafka Sink
    CREATE  TABLE kafka_sink (f_sequence INT, f_random INT, f_random_str STRING, f_ts TIMESTAMP)
            WITH (
                'connector' = 'kafka',
                'topic' = 'leap_flink_test_for_lz_20220709',
                'properties.bootstrap.servers' = 'kafka-cnngue0c2fawaqrk.kafka.ivolces.com:9092',
                'format' = 'json',
                'scan.startup.mode' = 'earliest-offset'
            );
    
    -- 创建Print Sink
    CREATE  TABLE print_sink (f_sequence INT, f_random INT, f_random_str STRING, f_ts TIMESTAMP)
            WITH ('connector' = 'print');
    
    -- 创建中间View,对随机产生的数据进行过滤
    CREATE  VIEW tmp_view AS
    SELECT  f_sequence,
            f_random,
            f_random_str,
            ts
    FROM    datagen_source
    WHERE   f_random > 900;
    
    -- 将结果数据写入Kafka
    INSERT INTO kafka_sink
    SELECT  *
    FROM    tmp_view;
    
    -- 将结果数据打印到print
    INSERT INTO print_sink
    SELECT  *
    FROM    tmp_view;
    
  2. 点击编辑器右上角的格式化按钮,可以对 SQL 进行格式化排版。

  3. 点击页面左上角的保存图标,可以保存任务草稿。

3 验证 SQL 正确性

点击编辑器右上角的解析按钮,验证 SQL 的正确性。如果 SQL 语法错误会有相应的弹窗提示,可根据错误提示对 SQL 进行修改。

4 设置参数

  1. 点击页面右边栏的 参数设置,进入 参数设置 页面。

  2. 配置相关任务参数。

    1. 基本信息:可以设置任务的责任人、标签管理等基本信息。

    2. 资源设置:设置 JobManger 和 TaskManager 的 CPU 、内存等资源。

    3. 数据源登记

      • 智能解析:可以智能解析出 SQL 中用到的 Source 和 Sink 数据源并登记到任务上。

      • 手动添加:通过手动添加源/目标、数据源类型及数据源信息等参数,实现手动进行数据源登记,以用于后续监控配置和血缘构建。

    4. Flink 运行参数:可以通过单行编辑模式和脚本编辑模式,进行配置 Flink 运行时参数。

    更多任务参数介绍详见 EMR Flink SQL 作业

  3. 点击页面左上角的保存图标,保存任务草稿。

5 上线任务

  1. 点击 SQL 编辑器上方的提交上线图标按钮,弹出提交上线窗口。

  2. 系统自动进行上线检查,检查通过后,进入提交上线设置页面。

  3. 设置上线信息。

    1. 提交设置选择提交并发布

    2. 上线后是否启动选择

  1. 点击 确认 按钮,弹出提示框,提示任务上线成功。