最近更新时间:2023.08.18 17:27:51
首次发布时间:2021.12.02 16:47:58
LAS Java Flink 任务适用于实时任务开发场景,支持引用资源 Jar 包的方式。
DataLeap产品若仅开通“大数据集成”服务时,不支持创建 LAS 流式任务。
项目已绑定 LAS 引擎服务,详见创建项目。
登录 DataLeap租户控制台 。
在概览界面,显示加入的项目中,单击数据开发进入对应项目。
在任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。
选择任务类型:
分类:数据开发。
绑定引擎:LAS。
关联实例:default。
选择任务:流式数据 LAS Java Flink。
填写任务基本信息:
任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。
保存至: 选择任务存放的目标文件夹目录。
单击确定按钮,成功创建任务。
任务创建完成后,您可通过下列参数来设置 Jar 包:
选择Jar包 可通过以下方式选择资源:
资源库中选取已有资源。
在左侧导航栏资源库中新建资源,详见:资源库操作。
执行引擎:下拉选择 LAS Flink 1.11 执行引擎。
Main Class:填写主类,例如:com.bytedance.flinkdemo.KafkaWordCount
Flink 相关的动态参数和执行参数,具体设置详见 Flink 官方文档。
任务模板:选择是否使用任务模板
您可以选择是否通过任务模板方式,便捷快速的复用一些参数逻辑,在下拉框中选择 LAS Java Flink 任务模板,并选择相应的版本号即可完成复用。操作说明详见:任务模板
其他参数
单行编辑模式:填写 key-value,key值只允许字母、数字、小数点、下划线和连字符。
a. 添加一行参数
b. 删除当前这行参数
c. 清空输入框中已输入的参数值
脚本编辑模式:通过JSON、Yaml的格式填写运行参数。
用户自定义参数,填写实时数据来源端相关实例信息,例如 kafka 消息队列接入时,需要填写 Topic、接入点地址等参数信息。
单行编辑模式:填写 key-value,key值只允许字母、数字、小数点、下划线和连字符。
脚本编辑模式:通过 JSON、Yaml 的格式填写运行参数。
单击右侧导航栏中参数设置,进行任务的基本信息、资源设置、数据源登记配置。
进行任务基本信息配置,选择当前任务责任人、LAS 队列和标签相关信息。
参数名称 | 描述 |
---|---|
任务名称 | 显示创建任务时输入的任务名称,参数设置中不支持修改,可以在左侧任务目录结构中的任务名称右侧更多单击重命名进行修改。 |
任务类型 | LAS Java Flink |
任务描述 | 非必填,可对任务进行详细描述,方便后续查看和管理。 |
责任人 | 仅限一个成员,默认为任务创建人(任务执行失败、复查通过或者失败时的默认接收者),可根据实际需要,修改为其他项目成员。
|
标签 | 您可以自定义标签,用于标识某一类任务,以便快速搜索过滤,操作即时生效,无需重新上线任务。
|
完成流式任务执行相关资源设置:
参数名称 | 描述 |
---|---|
TaskManager 个数 | 设置 flink 作业中 TaskManager 的数量。 |
单 TaskManagerCPU 数 | 设置单个 TaskManager 所占用的 CPU 数量。 |
单 TaskManager 内存大小(MB) | 设置单个 TaskManager 所占用的内存大小,根据 TaskManagerCPU 数联动。 |
单 TaskManager slot 数 | 设置单个 TaskManager 中 slot 的数量。 |
JobManager CPU 数 | 设置单个 JobManager 所占用的 CPU 数量。 |
JobManager 内存 | 设置单个 JobManager 所占用的内存大小,根据 JobManager CPU 数联动。 |
登记该任务使用的 Source 和 Sink 信息,以用于后续监控配置和血缘构建,支持通过手动添加。
手动添加:单击手动添加按钮,可根据 SQL 逻辑进行数据源上下游登记。
源/目标:支持源/目标 Source 和 Sink 的选择。
数据源类型:支持选择 DataGen、Rmq、Bmq、JDBC、Abase、火山/自建 Kafka、LAS 表这些数据源类型。
各数据源类型需补充填写相应的信息,您可根据实际场景进行配置,如 Kafka 的数据源类型,需填写具体的Bootstrap Servers、Topic 名称、消费组等信息。
直接上游任务:您可通过手动输入任务名称方式,进行搜索上游任务,单击添加按钮,进行手动添加。
直接下游任务:暂时无法通过手动添加的方式进行操作。
说明
Abase 和 Rmq 数据源类型,仅支持解析操作,暂不支持上线发布后运行。且 Abase 数据源通常是在火山引擎内部业务上云中使用。
LAS 引擎暂不支持自动解析功能来登记数据源。
批量删除:手动添加的数据源,您可通过勾选已登记的数据源后,单击右侧删除或批量删除按钮 ,删除勾选中的数据源。
上下游任务查看:数据源登记解析出的上下游任务,您可对上游依赖任务进行编辑、添加、删除、禁用等操作;下游任务仅支持查看。
任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。 单击操作栏中的保存和提交上线按钮,在弹窗中,需先通过提交事前检查和提交上线等上线流程,最后单击确认按钮,完成作业提交。详见概述---流式任务提交发布。
注意
上线流程中的“提交事前检查”,需租户主账号或项目管理员先在项目控制台 > 流水线管理中,创建相应的流水线检查事宜后方可显示。详见流水线管理。
后续任务运维操作详见:实时任务运维。
Flink Jar 任务需要通过 Tunnel 读写数据,Tunnel 的操作对用户是透明的,用户使用普通的 API 即可通过 Tunnel 实现对表的读写操作。
POM 文件中添加以下依赖,Scope 为 Provided,Tunnel 的相关依赖在 Runtime 时由容器 Image 提供。
<dependency> <groupId>com.bytedance.las</groupId> <artifactId>flink-tunnel</artifactId> <version>1.2.0.6-public-connector</version> <scope>provided</scope> </dependency> <repositories> <repository> <id>bytedance-public</id> <name>bytedance maven</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository> </repositories>
参数 | 默认值 | 说明 |
---|---|---|
connector (Required) | Flink connector 类型, 填写 'las' | |
access.id (Required) | Access Key | |
access.key (Required) | Secret Key | |
db.name (Required) | 数据库名 | |
table.name (Required) | 表名 | |
end.point (Required) | 连接 Tunnel 的 Endpoint | |
partition (Required for source table) | SELF_ADAPTER | 非分区表:需要指定为空字符串 |
las.service.region (Optional) | cn-beijing | Tunnel 集群所在 Region, 填写 'cn-beijing' |
write.chunk_size (Optional) | 100 * 1024 | 上传时,每个 Chunk 的大小, 单位 byte |
write.cache_size (Optional) | 100 | 上传时, Client 缓存的未发送 Chunk 数量 |
read.cache_size (Optional) | 100 | 下载时, Client 缓存的未读取 Chunk 数量 |
read.chunk_size (Optional) | 100 * 1024 | 下载时, 每个 Chunk 的大小,单位 byte |
transfer.rpc.timeout (Optional) | 180 | RPC 请求超时时间 (s) |
write.tasks (Optional) | 1 | Write 任务的并行度 |
write.mode (Optional) | insert | 目前支持 : insert / overwrite_insert |
Flink Tunnel 参数需要指定 Tunnel 的 AK/SK,可通过以下方式进行获取:
登录火山引擎首页。
单击右上角个人头像 > 密钥管理,进入密钥管理页面:
在密钥管理页可以进行查看或添加 AK/SK,填入对应的参数配置中即可。
将 100.96.4.12:80 填入对应的参数配置中。
public static void main(String[] args) throws InterruptedException { TableEnvironment streamTableEnv; EnvironmentSettings settings = EnvironmentSettings.*newInstance*().build(); streamTableEnv = TableEnvironmentImpl.*create*(settings); streamTableEnv.getConfig().getConfiguration() .setInteger(ExecutionConfigOptions.*TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM*, 1); streamTableEnv.getConfig().getConfiguration() .setString("execution.checkpointing.interval", "10s"); streamTableEnv.getConfig().getConfiguration() .setString("akka.ask.timeout", "300s"); String hoodieTableSource = "" + "CREATE TABLE las_source(\n" + " a MAP<STRING,INT>,\n" + " c ARRAY<INT>,\n" + " e TINYINT," + " e1 SMALLINT," + " e2 INT," + " e3 BIGINT," + " f FLOAT," + " f1 DOUBLE," + " g STRING," + " h BINARY," + " i BOOLEAN," + " j DECIMAL," + " j1 DECIMAL," + " k DATE," + " m TIMESTAMP" + ") with (\n" + " 'connector' = 'las'," + " 'access.id' = 'XXX' ," + " 'access.key' = 'XXX'," + " 'db.name'= 'XXX'," + " 'table.name' = 'XXX'," + " 'partition' = 'pt=1,ds=2'," + " 'end.point' = '100.96.4.12:80' ," + " 'las.service.region' = 'cn-beijing'," + " 'transfer.rpc.timeout' = '1000'" + ")"; String hoodieTableSink = "" + "CREATE TABLE las_sink(\n" + " a MAP<STRING,INT>,\n" + " c ARRAY<INT>,\n" + " e TINYINT," + " e1 SMALLINT," + " e2 INT," + " e3 BIGINT," + " f FLOAT," + " f1 DOUBLE," + " g STRING," + " h BINARY," + " i BOOLEAN," + " j DECIMAL," + " j1 DECIMAL," + " k DATE," + " m TIMESTAMP" + ") with (\n" + " 'connector' = 'las'," + " 'access.id' = 'XXX' ," + " 'access.key' = 'XXX'," + " 'db.name'= 'XXX'," + " 'table.name' = 'XXX'," + " 'end.point' = '100.96.4.12:80' ," + " 'las.service.region' = 'cn-beijing'," + " 'transfer.rpc.timeout' = '1000000'" + ")"; String sql = "insert into las_sink select * from las_source"; streamTableEnv.executeSql(hoodieTableSource); streamTableEnv.executeSql(hoodieTableSink); TableResult tableResult = streamTableEnv.executeSql(sql); // wait for finish try { tableResult.getJobClient().get() .getJobExecutionResult(Thread.*currentThread*().getContextClassLoader()).get(); } catch (ExecutionException e) { e.printStackTrace(); } }
由于 LAS 统一使用 CU 作为计量单位,目前 Flink 内存相关参数已被禁用。如需调整内存,需要使用 las.job.driver.cu
和 las.job.executor.cu
(默认值均为 4)这两个参数来控制,计算规则如下:
kubernetes.jobmanager.cpu = lasJobDriverCU jobmanager.memory.process.size = lasJobDriverCU * 4g kubernetes.taskmanager.cpu = lasJobExecutorCU taskmanager.memory.process.size = lasJobExecutorCU * 4g