You need to enable JavaScript to run this app.
最新活动
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们
导航

LAS Java Flink

最近更新时间2023.08.18 17:27:51

首次发布时间2021.12.02 16:47:58

1 概述

LAS Java Flink 任务适用于实时任务开发场景,支持引用资源 Jar 包的方式。

2 前提条件

  • DataLeap产品若仅开通“大数据集成”服务时,不支持创建 LAS 流式任务。

  • 项目已绑定 LAS 引擎服务,详见创建项目

3 新建任务

  1. 登录 DataLeap租户控制台

  2. 概览界面,显示加入的项目中,单击数据开发进入对应项目。

  3. 任务开发界面,左侧导航栏中,单击新建任务按钮,进入新建任务页面。

  4. 选择任务类型:

    1. 分类:数据开发

    2. 绑定引擎:LAS

    3. 关联实例:default

    4. 选择任务:流式数据 LAS Java Flink

  5. 填写任务基本信息:

    1. 任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。

    2. 保存至: 选择任务存放的目标文件夹目录。

  6. 单击确定按钮,成功创建任务。

4 任务配置说明

4.1 引用资源

任务创建完成后,您可通过下列参数来设置 Jar 包:

  • 选择Jar包 可通过以下方式选择资源:

    • 资源库中选取已有资源。

    • 在左侧导航栏资源库中新建资源,详见:资源库操作

  • 执行引擎:下拉选择 LAS Flink 1.11 执行引擎。

  • Main Class:填写主类,例如:com.bytedance.flinkdemo.KafkaWordCount

Flink 相关的动态参数和执行参数,具体设置详见 Flink 官方文档

  • 任务模板:选择是否使用任务模板

    您可以选择是否通过任务模板方式,便捷快速的复用一些参数逻辑,在下拉框中选择 LAS Java Flink 任务模板,并选择相应的版本号即可完成复用。操作说明详见:任务模板

  • 其他参数

    • 单行编辑模式:填写 key-value,key值只允许字母、数字、小数点、下划线和连字符。
      a. 添加一行参数

      b. 删除当前这行参数

      c. 清空输入框中已输入的参数值

      alt

    • 脚本编辑模式:通过JSON、Yaml的格式填写运行参数。

4.3 用户自定义参数

用户自定义参数,填写实时数据来源端相关实例信息,例如 kafka 消息队列接入时,需要填写 Topic、接入点地址等参数信息。

  • 单行编辑模式:填写 key-value,key值只允许字母、数字、小数点、下划线和连字符。

  • 脚本编辑模式:通过 JSON、Yaml 的格式填写运行参数。

4.4 参数设置

单击右侧导航栏中参数设置,进行任务的基本信息、资源设置、数据源登记配置。

4.4.1 基本信息

进行任务基本信息配置,选择当前任务责任人、LAS 队列和标签相关信息。

参数名称描述
任务名称显示创建任务时输入的任务名称,参数设置中不支持修改,可以在左侧任务目录结构中的任务名称右侧更多单击重命名进行修改。
任务类型LAS Java Flink
任务描述非必填,可对任务进行详细描述,方便后续查看和管理。

责任人

仅限一个成员,默认为任务创建人(任务执行失败、复查通过或者失败时的默认接收者),可根据实际需要,修改为其他项目成员。

  • 责任人需为项目中有编辑权限的成员,项目权限须由项目管理员授权。

  • DataLeap 通过项目空间实现开发协同,项目中具有编辑权限的角色,可对项目下的所有任务进行编辑,因此无须通过为任务设置多个责任人,来达到协同合作的效果。

标签

您可以自定义标签,用于标识某一类任务,以便快速搜索过滤,操作即时生效,无需重新上线任务。

  • 下拉选择项目归属的标签组,及对应的标签信息,支持添加多个标签组。

  • 若没有可选的标签组,您可进行以下操作步骤:

    • 进入 DataLeap 智能市场 > 任务标签管理,进入任务标签管理控制台,新建标签组。详见任务标签管理

    • 新建标签组成功后,前往项目管理 > 具体项目的配置信息 > 标签管理来添加标签组。详见标签管理

4.4.2 资源设置

完成流式任务执行相关资源设置:

参数名称描述
TaskManager 个数设置 flink 作业中 TaskManager 的数量。
单 TaskManagerCPU 数设置单个 TaskManager 所占用的 CPU 数量。
单 TaskManager 内存大小(MB)设置单个 TaskManager 所占用的内存大小,根据 TaskManagerCPU 数联动。
单 TaskManager slot 数设置单个 TaskManager 中 slot 的数量。
JobManager CPU 数设置单个 JobManager 所占用的 CPU 数量。
JobManager 内存设置单个 JobManager 所占用的内存大小,根据 JobManager CPU 数联动。

4.4.3 数据源登记

登记该任务使用的 Source 和 Sink 信息,以用于后续监控配置和血缘构建,支持通过手动添加

  • 手动添加:单击手动添加按钮,可根据 SQL 逻辑进行数据源上下游登记。

    • 源/目标:支持源/目标 Source 和 Sink 的选择。

    • 数据源类型:支持选择 DataGen、Rmq、Bmq、JDBC、Abase、火山/自建 Kafka、LAS 表这些数据源类型。

    • 各数据源类型需补充填写相应的信息,您可根据实际场景进行配置,如 Kafka 的数据源类型,需填写具体的Bootstrap Servers、Topic 名称、消费组等信息。

    • 直接上游任务:您可通过手动输入任务名称方式,进行搜索上游任务,单击添加按钮,进行手动添加。

    • 直接下游任务:暂时无法通过手动添加的方式进行操作。

    说明

    1. Abase 和 Rmq 数据源类型,仅支持解析操作,暂不支持上线发布后运行。且 Abase 数据源通常是在火山引擎内部业务上云中使用。

    2. LAS 引擎暂不支持自动解析功能来登记数据源。

  • 批量删除:手动添加的数据源,您可通过勾选已登记的数据源后,单击右侧删除批量删除按钮 ,删除勾选中的数据源。

  • 上下游任务查看:数据源登记解析出的上下游任务,您可对上游依赖任务进行编辑、添加、删除、禁用等操作;下游任务仅支持查看。

5 提交任务

任务所需参数配置完成后,将任务提交发布到运维中心实时任务运维中执行。 单击操作栏中的保存提交上线按钮,在弹窗中,需先通过提交事前检查提交上线等上线流程,最后单击确认按钮,完成作业提交。详见概述---流式任务提交发布

注意

上线流程中的“提交事前检查”,需租户主账号或项目管理员先在项目控制台 > 流水线管理中,创建相应的流水线检查事宜后方可显示。详见流水线管理

后续任务运维操作详见:实时任务运维

6 Jar 包开发示例

Flink Jar 任务需要通过 Tunnel 读写数据,Tunnel 的操作对用户是透明的,用户使用普通的 API 即可通过 Tunnel 实现对表的读写操作。

6.1依赖说明

POM 文件中添加以下依赖,Scope 为 Provided,Tunnel 的相关依赖在 Runtime 时由容器 Image 提供。

<dependency>
  <groupId>com.bytedance.las</groupId>
  <artifactId>flink-tunnel</artifactId>
  <version>1.2.0.6-public-connector</version>
  <scope>provided</scope>
</dependency>

<repositories>
  <repository>
    <id>bytedance-public</id>
    <name>bytedance maven</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
  </repository>
</repositories>

6.2 参数说明

参数默认值说明
connector (Required)Flink connector 类型, 填写 'las'
access.id (Required)Access Key
access.key (Required)Secret Key
db.name (Required)数据库名
table.name (Required)表名
end.point (Required)连接 Tunnel 的 Endpoint

partition (Required for source table)

SELF_ADAPTER

非分区表:需要指定为空字符串
SinkTable:分区表无需配置该参数,但需要注意分区列的定义必须在字段列表的末尾
SourceTable:需要在这里手动指定读取哪个分区 (当前仅支持读取单个分区), 例 : 'partition' = 'pt = 1,ds = 2'

las.service.region (Optional)cn-beijingTunnel 集群所在 Region, 填写 'cn-beijing'
write.chunk_size (Optional)100 * 1024上传时,每个 Chunk 的大小, 单位 byte
write.cache_size (Optional)100上传时, Client 缓存的未发送 Chunk 数量
read.cache_size (Optional)100下载时, Client 缓存的未读取 Chunk 数量
read.chunk_size (Optional)100 * 1024下载时, 每个 Chunk 的大小,单位 byte
transfer.rpc.timeout (Optional)180RPC 请求超时时间 (s)
write.tasks (Optional)1Write 任务的并行度
write.mode (Optional)insert目前支持 : insert / overwrite_insert

6.3 AK/SK 获取

Flink Tunnel 参数需要指定 Tunnel 的 AK/SK,可通过以下方式进行获取:

  1. 登录火山引擎首页

  2. 单击右上角个人头像 > 密钥管理,进入密钥管理页面:

  3. 在密钥管理页可以进行查看或添加 AK/SK,填入对应的参数配置中即可。

6.4 Endpoint 获取

将 100.96.4.12:80 填入对应的参数配置中。

6.5 作业示例

public static void main(String[] args) throws InterruptedException {
  TableEnvironment streamTableEnv;

  EnvironmentSettings settings = EnvironmentSettings.*newInstance*().build();

  streamTableEnv = TableEnvironmentImpl.*create*(settings);
  streamTableEnv.getConfig().getConfiguration()
      .setInteger(ExecutionConfigOptions.*TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM*, 1);
  streamTableEnv.getConfig().getConfiguration()
      .setString("execution.checkpointing.interval", "10s");
  streamTableEnv.getConfig().getConfiguration()
      .setString("akka.ask.timeout", "300s");

  String hoodieTableSource = ""
      + "CREATE TABLE las_source(\n"
      + "  a MAP<STRING,INT>,\n"
      + "  c ARRAY<INT>,\n"
      + "  e TINYINT,"
      + "  e1 SMALLINT,"
      + "  e2 INT,"
      + "  e3 BIGINT,"
      + "  f FLOAT,"
      + "  f1 DOUBLE,"
      + "  g STRING,"
      + "  h BINARY,"
      + "  i BOOLEAN,"
      + "  j DECIMAL,"
      + "  j1 DECIMAL,"
      + "  k DATE,"
      + "  m TIMESTAMP"
      + ") with (\n"
      + "  'connector' = 'las',"
      + " 'access.id' = 'XXX' ,"
      + " 'access.key' = 'XXX',"
      + " 'db.name'= 'XXX',"
      + " 'table.name' = 'XXX',"
      + " 'partition' = 'pt=1,ds=2',"
      + " 'end.point' = '100.96.4.12:80' ,"
      + " 'las.service.region' = 'cn-beijing',"
      + " 'transfer.rpc.timeout' = '1000'"
      + ")";

  String hoodieTableSink = ""
      + "CREATE TABLE las_sink(\n"
      + "  a MAP<STRING,INT>,\n"
      + "  c ARRAY<INT>,\n"
      + "  e TINYINT,"
      + "  e1 SMALLINT,"
      + "  e2 INT,"
      + "  e3 BIGINT,"
      + "  f FLOAT,"
      + "  f1 DOUBLE,"
      + "  g STRING,"
      + "  h BINARY,"
      + "  i BOOLEAN,"
      + "  j DECIMAL,"
      + "  j1 DECIMAL,"
      + "  k DATE,"
      + "  m TIMESTAMP"
      + ") with (\n"
      + "  'connector' = 'las',"
      + " 'access.id' = 'XXX' ,"
      + " 'access.key' = 'XXX',"
      + " 'db.name'= 'XXX',"
      + " 'table.name' = 'XXX',"
      + " 'end.point' = '100.96.4.12:80' ,"
      + " 'las.service.region' = 'cn-beijing',"
      + " 'transfer.rpc.timeout' = '1000000'"
      + ")";

  String sql = "insert into las_sink select * from las_source";
  streamTableEnv.executeSql(hoodieTableSource);
  streamTableEnv.executeSql(hoodieTableSink);
  TableResult tableResult = streamTableEnv.executeSql(sql);
  // wait for finish
  try {
    tableResult.getJobClient().get()
        .getJobExecutionResult(Thread.*currentThread*().getContextClassLoader()).get();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
}

6.6 相关限制

由于 LAS 统一使用 CU 作为计量单位,目前 Flink 内存相关参数已被禁用。如需调整内存,需要使用 las.job.driver.culas.job.executor.cu(默认值均为 4)这两个参数来控制,计算规则如下:

kubernetes.jobmanager.cpu = lasJobDriverCU
jobmanager.memory.process.size = lasJobDriverCU * 4g
kubernetes.taskmanager.cpu = lasJobExecutorCU
taskmanager.memory.process.size = lasJobExecutorCU * 4g