You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
流式数据集成
Kafka流式集成任务配置
复制全文
Kafka流式集成任务配置

将Kafka数据实时同步至Elasticsearch、HDFS、Hive、LAS、StarRocks等目标端数据源,实现Kafka数据源与各目标端数据源之间的数据实时传输。同时支持TIDB数据源中的增量数据,通过Kafka2LAS通道,将增量数据写入LAS数据表中。下文以Kafka-LAS通道任务配置为例,为您介绍流式集成任务配置。

数据源注册

LAS、Kafka数据源可在数据开发 > 数据源管理中进行配置,相关操作可参见“注册数据源”。

流式集成配置

任务创建完成后,直接进入任务配置页面,依次设置以下任务信息。

  1. 选择数据源。

    1. 数据来源信息
      数据来源端选择Kafka,并完成以下相关参数配置:
      其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

      参数

      说明

      *数据源类型

      下拉选择Kafka数据源类型。

      *数据源名称

      已在数据源管理界面注册的Kafka数据源,下拉可选。

      *Topic 名称

      Kafka处理的消息源的不同分类主题名称,下拉可选数据源下对应的Topic名称。

      *数据格式

      支持JSON、Pb、Debezium Json、OceanBase SharePlex Json、Canal Json,下拉可选,默认为JSON格式,您可根据实际数据情况进行数据格式选择。
      当选择Pb时,需要填写参数信息 Pb类定义Pb Class

      *示例数据

      数据格式为jsonDebezium Json、OceanBase SharePlex Json、Canal Json时,需要以json字符串形式描述schema。 必须填写完整的数据,否则schema不准确,例如:{"uid":123, "ut":12, "user_name": "xxx"}。

      *Pb类定义

      数据格式为Pb时,需要先定义Pb类,一次只支持一个Pb类的定义,示例如下:

      syntax = "proto2";
      package abase_test;
      message AbaseTest {
          required int64 first_id = 1;
          required int64 latest_id = 2;
      }
      

      *PB Class

      数据格式为Pb时,需要填写PB Class信息:示例如下:

      message AbaseTest {
      required int64 first_id = 1;
      required int64 latest_id = 2;
      }
      

      示例中的PB Class为AbaseTest。

    2. 数据写入目标配置
      数据来源端选择LAS,并完成以下相关参数配置:
      其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

      参数

      说明

      *目标类型

      数据去向目标类型选择LAS。

      *数据源名称

      已在数据源管理界面注册的LAS数据源,下拉可选。

      *数据表

      对应数据源的Schema下所创建的LAS表,下拉可选。

      说明

      • 流式Kafka目前仅支持写入无主键的LAS表,且LAS表需要是ByteLake表类型。

      *分区频率

      支持选择天级、小时级两个频率,数据会跟随LAS分区生成时间产出,非实时写入。

      *分区设置

      分区字段、类型、内容会从LAS表自动获取并判断。 输出分区示例:date=20220930/hour=16。

  2. 设置字段映射信息。
    字段为一一映射关系,即将源表字段信息,同步到同一行的目标字段信息中,支持自动添加、手动添加,以及编辑、删除、顺序调整等操作。

    说明

    • 当任务配置为TIDB增量数据源同步时,自动添加的字段映射会默认添加“_event_time”字段,故目标端LAS表,需先创建相应的“_event_time”字段来映射接收。
    • 当Kafka是多层 JSON 格式时,您也可通过 a.b.c 的方式,配置提取Kafka多层结构数据。

    Image

  3. 高级参数设置(可选填)
    您可在高级参数中设置归档、消费起始或任务运行高级参数等信息。

    参数

    说明

    是否开启归档

    可选择是否进行归档。
    当选择时,需要设置归档字段类型。

    归档字段类型

    支持整型字符串两种类型。
    当选择字符串归档字段类型时,需另外设置归档字段格式

    归档字段

    输入归档字段信息。

    归档字段格式

    符合Java Date Format标准,例如:

    • 若日期格式为:20181017,则填写yyyyMMdd。
    • 若日期格式为:2018-10-17,则填写yyyy-MM-dd。
    • 若日期格式为:20181017 12:08:56,则填写yyyyMMdd HH:mm:ss。
    • 若日期格式为:2018-10-17 12:08:56,则填写yyyy-MM-dd HH:mm:ss。
    • 若日期格式为:2018-10-17T12:08:56,则填写yyyy-MM-dd'T'HH:mm:ss。
    • 若日期格式为:2018-10-17T12:08:56.235,则填写yyyy-MM-dd'T'HH:mm:ss.SSS。

    高级参数

    可选择是否要进行高级参数设置,默认关闭。
    开启设置后,支持并发度、压缩格式、副本数等配置。
    编辑模式支持单行编辑模式和脚本编辑模式。

    • 单行编辑:设置字段包括key值和value。
    • 脚本编辑:支持JSON、Yaml格式填写参数。

    说明

    TIDB数据源若在Kafka存在多个表时,需在高级参数中添加job.writer.target-table参数,指定TIDB数据表名称信息,来进行导入。
    您也可通过正则匹配方式,来配置多表读取后导入。

  4. 参数设置
    单击右侧侧边栏的参数设置按钮,进入配置流式集成任务运行队列、资源、Flink 运行参数等参数信息。
    其中流式集成任务支持通过设置 Flink 运行参数,来实现任务失败自动重试的能力。在 Flink 运行参数中,设置 studio.restart.attempts=N参数,即可实现任务失败自动重试,其中“N”为自动重试次数,您可按需进行设置。
    Image
    更多参数说明详见“参数设置”。

  5. 单击任务配置上方导航栏中的保存并提交上线按钮,完成任务上线操作。上线说明详见“提交上线”。

后续操作

任务提交发布到流式运维中心后,您便可进行后续的运维操作。详见“流式任务运维”。

最近更新时间:2025.02.07 17:44:21
这个页面对您有帮助吗?
有用
有用
无用
无用