最近更新时间:2023.08.18 17:27:50
首次发布时间:2021.08.13 15:07:14
流式 EMR SQL 底层的计算引擎为 Apache Flink,其符合标准 SQL 语义,降低了用户进行实时开发的门槛,支持在线创建、解析和运维流式任务。
本文将为您介绍 EMR Flink SQL 任务的相关使用。
DataLeap产品需开通数据开发特惠版、DataOps敏捷研发、大数据分析或分布式数据自治服务后,才可创建 火山引擎 E-MapReduce(EMR)流式数据开发任务。
EMR 引擎绑定的集群类型、版本及依赖的服务,需满足以下条件之一,方可创建 EMR Flink SQL 任务:
支持集群版本 | 支持集群类型 | 依赖集群服务 |
---|---|---|
EMR-1.3.1 | Hadoop | Flink |
Flink | Flink | |
EMR-3.2.1 及以上 | Hadoop | Flink 和 GTS |
EMR Flink SQL 目前仅支持原生Connector:kafka、datagen、print。
DataLeap 项目控制台首次绑定 EMR 集群时,会提示在 EMR 集群关联的安全组中添加 8898 和 9030 端口,您单击确定按钮即可实现自动添加。添加后,为确保能在 DataLeap 上正常进行数据开发和执行任务,需保证相关端口一直存在于安全组中,不要删除。
详见创建项目。
新建Kafka Topic,具体操作请参考:创建Topic。
获取 bootstrap.servers,具体操作请参考:使用默认接入点连接实例。
选择流式数据 > EMR Flink SQL 任务类型新建任务。
在概览界面,显示加入的项目中,单击数据开发进入对应项目。
在任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。
选择任务类型:
分类:数据开发。
绑定引擎:EMR。
关联实例:展现项目创建时绑定的EMR引擎实例。
选择任务:流式数据 EMR Flink SQL 。
填写任务基本信息:
任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。
保存至: 选择任务存放的目标文件夹目录。
单击确认按钮,成功创建任务。
新建任务成功后,进入代码开发编辑界面,通过 DDL 和 DML 编写 SQL,下图以DataGen生成数据写入Kafka为例进行说明,详细语法可参考对应版本 Flink 官方文档。
CREATE TABLE datagen_source (f_sequence INT) WITH ('connector' = 'datagen', 'rows-per-second' = '60'); CREATE TABLE kafka_sink (f_sequence INT, f_ts BIGINT) WITH ( 'connector' = 'kafka', 'topic' = '{{kafka_topic}}', 'properties.bootstrap.servers' = '{{bootstrap.servers}}', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' ); INSERT INTO kafka_sink SELECT f_sequence, UNIX_TIMESTAMP() AS f_ts FROM datagen_source;
注意
任务被模板使用后,格式化、表管理、执行引擎选择功能将不支持使用。
功能名称 | 描述 |
---|---|
格式化 | 依据在个性化设置中的 SQL 格式化风格的设置,格式化书写的代码,使其语法结构看起来简洁明了。 |
解析 | 解析检查书写的 SQL 代码的语法和语义正确性,运行前检查语法错误信息,防止运行出错。 |
任务模板 | 您可以选择是否通过任务模板方式,便捷快速的复用代码模板逻辑,在弹窗中选择 EMR Flink SQL 任务模板,并选择相应的版本号,输入替换的参数即可完成复用。 注意
|
执行引擎 | 下拉选择 EMR 集群中 Flink 组件对应的执行引擎版本。 |
单击右侧导航栏中参数设置,进行任务的基本信息、任务输入参数、资源设置、数据源登记、Flink 运行参数配置。
EMR Flink SQL 任务的基本信息配置如下:
参数名称 | 描述 |
---|---|
任务名称 | 显示创建任务时输入的任务名称,参数设置中不支持修改,可以在左侧任务目录结构中的任务名称右侧更多单击重命名进行修改。 |
任务类型 | EMR Flink SQL |
任务描述 | 非必填,可对任务进行详细描述,方便后续查看和管理。 |
责任人 | 仅限一个成员,默认为任务创建人(任务执行失败、复查通过或者失败时的默认接收者),可根据实际需要,修改为其他项目成员。
|
标签 | 您可以自定义标签,用于标识某一类任务,以便快速搜索过滤,操作即时生效,无需重新上线任务。
|
您可通过手动添加的方式,选择项目参数或自定义参数,以变量或常量形式传入流式任务中使用,来实现同一套代码在不同执行环境下能够自动替换参数来执行。添加操作详见 3.5.2-任务输入参数。
设置 EMR Flink SQL 运行时相关资源分配:
参数名称 | 描述 |
---|---|
TaskManager个数 | 设置 flink 作业中 TaskManager 的数量 |
单TaskManagerCPU数 | 设置单个 TaskManager 所占用的 CPU 数量 |
单TaskManager内存大小(MB) | 设置单个 TaskManager 所占用的内存大小 |
单TaskManager slot数 | 设置单个 TaskManager 中 slot 的数量 |
JobManager CPU数 | 设置单个 JobManager 所占用的 CPU 数量 |
JobManager内存 | 设置单个 JobManager 所占用的内存大小 |
登记该任务使用的 Source 和 Sink 信息,以用于后续监控配置和血缘构建,支持通过手动添加。
手动添加:单击手动添加按钮,可根据 SQL 逻辑进行数据源上下游登记。
源/目标:支持源/目标 Source 和 Sink 的选择。
数据源类型:支持选择 DataGen、Rmq、Bmq、JDBC、Abase、火山/自建 Kafka 这些数据源类型。
各数据源类型需补充填写相应的信息,您可根据实际场景进行配置,如 Kafka 的数据源类型,需填写具体的Bootstrap Servers、Topic 名称、消费组等信息。
直接上游任务:您可通过手动输入任务名称方式,进行搜索上游任务,单击添加按钮,进行手动添加。
直接下游任务:暂时无法通过手动添加的方式进行操作。
说明
Abase 和 Rmq 数据源类型,仅支持解析操作,暂不支持上线发布后运行。且 Abase 数据源通常是在火山引擎内部业务上云中使用。
EMR 引擎暂不支持自动解析功能来登记数据源。
批量删除:手动添加的数据源,您可通过勾选已登记的数据源后,单击右侧删除或批量删除按钮 ,删除勾选中的数据源。
上下游任务查看:数据源登记解析出的上下游任务,您可对上游依赖任务进行编辑、添加、删除、禁用等操作;下游任务仅支持查看。
Flink 相关的动态参数和执行参数,具体设置详见 Flink 官方文档。
单行编辑模式:填写 key-value,key值只允许字母、数字、小数点、下划线和连字符。
a. 添加一行参数
b. 删除当前这行参数
c. 清空输入框中已输入的参数值
脚本编辑模式:通过JSON、Yaml的格式填写运行参数。
任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。
单击操作栏中的保存和提交上线按钮,在弹窗中,需先通过提交事前检查和提交上线等上线流程,最后单击确认按钮,完成作业提交。详见概述---流式任务提交发布。
注意
上线流程中的“提交事前检查”,需租户主账号或项目管理员先在项目控制台 > 流水线管理中,创建相应的流水线检查事宜后方可显示。详见流水线管理。
后续任务运维操作详见:实时任务运维。
流式任务启动后,可按照以下步骤,可以查看数据是否已写入目的端,以kafka为例,详细介绍请参考:消息查询。
单击要查看的实例名称/ID,进入该实例详情页面。
单击上方导航栏中 Topic管理 页签,进入Kafka Topic管理页面。
单击要查询的 Topic名称,进入该实例页面,确认 末端Offset 是否有更新(即数值不为0)。
返回上一级后,单击消息查询页签,进入消息查询页面。
输入查询信息,下方列表中显示对应的消息。
单击消息所在行的下载消息,可下载该消息的信息。
将消息值进行Base64解码,确认结果和SQL逻辑是否匹配。