最近更新时间:2022.12.15 11:56:38
首次发布时间:2022.08.09 18:35:23
项目创建完成后,可以在该项目下创建开发任务,步骤如下:
点击项目名称,进入该项目的数据开发页面。
点击左侧的新建任务按钮,进入新建任务页面。
设置任务信息。
分类选择数据开发。
绑定引擎选择 EMR。
在关联实例下拉列表中,选择自己开通的EMR实例。
选择任务设置为流式数据 > EMR Flink SQL。
设置任务名称和保存路径。
点击 确定 按钮,完成创建。
任务创建完成后,直接进入EMR Flink SQL编辑页面,需执行以下编码操作:
在 EMR Flink SQL 编辑器中,输入以下 Flink SQL 代码后,将 kafka_topic 和 bootstrap_servers 替换为自己准备好的 Kafka 数据源。
创建一个随机数据源,从随机数据源获取数据并进行加工处理,然后写入到Kafka消息队列,同时打印到运行日志中,代码示例如下所示。
-- 创建随机数据源 CREATE TABLE datagen_source ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second' = '2', 'fields.f_sequence.kind' = 'sequence', 'fields.f_sequence.start' = '1', 'fields.f_sequence.end' = '10000', 'fields.f_random.min' = '1', 'fields.f_random.max' = '10000', 'fields.f_random_str.length' = '10' ); -- 创建Kafka Sink CREATE TABLE kafka_sink (f_sequence INT, f_random INT, f_random_str STRING, f_ts TIMESTAMP) WITH ( 'connector' = 'kafka', 'topic' = 'leap_flink_test_for_lz_20220709', 'properties.bootstrap.servers' = 'kafka-cnngue0c2fawaqrk.kafka.ivolces.com:9092', 'format' = 'json', 'scan.startup.mode' = 'earliest-offset' ); -- 创建Print Sink CREATE TABLE print_sink (f_sequence INT, f_random INT, f_random_str STRING, f_ts TIMESTAMP) WITH ('connector' = 'print'); -- 创建中间View,对随机产生的数据进行过滤 CREATE VIEW tmp_view AS SELECT f_sequence, f_random, f_random_str, ts FROM datagen_source WHERE f_random > 900; -- 将结果数据写入Kafka INSERT INTO kafka_sink SELECT * FROM tmp_view; -- 将结果数据打印到print INSERT INTO print_sink SELECT * FROM tmp_view;
点击编辑器右上角的格式化按钮,可以对 SQL 进行格式化排版。
点击页面左上角的保存图标,可以保存任务草稿。
点击编辑器右上角的解析按钮,验证 SQL 的正确性。如果 SQL 语法错误会有相应的弹窗提示,可根据错误提示对 SQL 进行修改。
点击页面右边栏的 参数设置,进入 参数设置 页面。
配置相关任务参数。
基本信息:可以设置任务的责任人、标签管理等基本信息。
资源设置:设置 JobManger 和 TaskManager 的 CPU 、内存等资源。
数据源登记:
智能解析:可以智能解析出 SQL 中用到的 Source 和 Sink 数据源并登记到任务上。
手动添加:通过手动添加源/目标、数据源类型及数据源信息等参数,实现手动进行数据源登记,以用于后续监控配置和血缘构建。
Flink 运行参数:可以通过单行编辑模式和脚本编辑模式,进行配置 Flink 运行时参数。
更多任务参数介绍详见 EMR Flink SQL 作业。
点击页面左上角的保存图标,保存任务草稿。
点击 SQL 编辑器上方的提交上线图标按钮,弹出提交上线窗口。
系统自动进行上线检查,检查通过后,进入提交上线设置页面。
设置上线信息。
提交设置选择提交并发布。
上线后是否启动选择是。