You need to enable JavaScript to run this app.
导航

配置 Kafka 数据源

最近更新时间2024.03.01 11:30:54

首次发布时间2022.09.15 17:46:56

Kafka 数据源为您提供实时读取和离线写入 Kafka 的双向通道能力,实现不同数据源与 Kafka 数据源之间进行数据传输。
本文为您介绍 DataSail 的 Kafka 数据同步的能力支持情况。

1 支持的 Kafka 版本

  • 实时读:
    • 支持火山引擎 Kafka 实例和自建 Kafka 集群,2.x 版本以上的集群连接,如 Kafka 2.2.0 版本及其以后的版本均支持读取。
    • 鉴权模式支持普通鉴权和 SSL 鉴权模式。

2 使用限制

  • 子账号新建数据源时,需要有项目的管理员角色,方可以进行新建数据源操作。各角色对应权限说明,详见:管理成员
  • Kafka 数据源目前支持可视化配置实时读取和离线写入 Kafka。
  • 为确保同步任务使用的独享集成资源组具有 Kafka 库节点的网络访问能力,您需将独享集成资源组和 Kafka 数据库节点网络打通,详见网络连通解决方案
  • 若通过 VPC 网络访问,则独享集成资源组所在 VPC 中的 IPv4 CIDR 地址,需加入到 Kafka 访问白名单中:
    1. 确认集成资源组所在的 VPC:
      图片
    2. 查看 VPC 的 IPv4 CIDR 地址:

      注意

      若考虑安全因素,减少 IP CIDR 的访问范围,您至少需要将集成资源组绑定的子网下的 IPv4 CIDR 地址加入到实例白名单中。

      图片
    3. 将获取到的 IPv4 CIDR 地址添加进 Kafka 实例白名单中。
  • 若是通过公网形式访问 Kafka 实例,则您需进行以下操作:
    • 独享集成资源组开通公网访问能力,操作详见开通公网
    • 并将公网 IP 地址,添加进 Kafka 实例白名单中。

3 支持的字段类型

目前支持的数据类型是根据数据格式来决定的,支持以下两种格式:

  1. JSON 格式:

    {
        "id":1,
        "name":"demo",
        "age":19,
        "create_time":"2021-01-01",
        "update_time":"2022-01-01"
    }
    
  2. Protobuf(PB) 格式:

    syntax = "proto2";
    message pb1 {
        optional string a = 1;
        optional pb2 b = 2;
        optional int32 c = 3;
    
        message pb2 {
            optional string x = 1;
            repeated int32 y = 2;
            optional pb3 z = 3;
        }
    
        message pb3 {
            optional string j = 1;
            repeated int32 k = 2;
        }
    }
    

4 数据同步任务开发

4.1 数据源注册

新建数据源操作详见配置数据源,以下为您介绍不同接入方式的 Kafka 数据源配置相关信息:

  1. 火山引擎 Kafka 接入方式
    其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

    参数

    说明

    基本配置

    *数据源类型

    Kafka

    *接入方式

    火山引擎 Kafka

    *数据源名称

    数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。

    参数配置

    *Kafka 实例 ID

    下拉选择已在火山引擎消息队列 Kafka 中创建的 Kafka 实例名称信息。
    若您还未创建 Kafka 实例,您可前往 Kafka 实例控制台中创建,详见创建实例

  2. 连接串形式接入
    用连接串形式配置 Kafka 数据源,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

    参数

    说明

    基本配置

    *数据源类型

    Kafka

    *接入方式

    火山引擎 Kafka

    *数据源名称

    数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。

    参数配置

    *Kafka 版本

    Kafka 版本,下拉可选。当前支持 Kafka 2.2.x 和 0.10 版本。

    *Kafka 集群地址

    启动客户端连接Kafka服务时使用。
    填写格式为 ip:port 或 host:port 格式,存在多个时,可用逗号分隔。如localhost:2181,localhost:2182

    *认证方式

    支持 SASL_PLAINTEXT、SASL_SSL 认证方式,您也可选择None不认证。
    选择 SASL_PLAINTEXT、SASL_SSL 认证方式时,需确认 Sasl 机制,目前支持选择 PLAIN、SCRAM-SHA-256 认证机制。

    *用户名

    输入有权限访问 Kafka 集群环境的用户名信息。

    *密码

    输入用户名对应的密码信息。

    扩展参数

    配置 Kafka 额外需要的扩展参数信息。

4.2 新建离线任务

Kafka 数据源测试连通性成功后,进入到数据开发界面,开始新建 Kafka 相关通道任务。新建任务方式详见离线数据同步流式数据同步
任务创建成功后,您可根据实际场景,配置 Kafka 离线写Kafka 流式读等通道任务。

4.3 可视化配置 Kafka 离线写

图片
数据目标端选择 Kafka,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 Kafka。

*数据源名称

已在数据源管理界面注册的 Kafka 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 Kafka 数据源。

*Topic名称

选择 Kafka 处理消息源的不同分类主题名称,下拉可选数据源下对应需写入数据的 Topic 名称。

*数据格式

默认仅支持 json 格式,不可编辑。

示例数据

需以 json 字符串形式描述 schema。必须填写完整的数据,否则schema不准确。

分区设置

可以自定义 Kafka 分区规则,从 Kafka message 字段中选择 0~N 个字段,用于保证指定字段相同的值写入到 Kafka 的同一 partition 中。

4.4 可视化配置 Kafka 流式读

图片
数据来源选择 Kafka,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 Kafka。

*数据源名称

已在数据源管理界面注册的 Kafka 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 Kafka 数据源。

*Topic 名称

选择 Kafka 处理消息源的不同分类主题名称,下拉可选数据源下对应需读取数据的 Topic 名称,支持同时选择多个结构相同的 Topic。

*数据类型

支持JSON、Pb,下拉可选,默认为JSON格式。
当选择Pb时,需要填写参数信息 Pb类定义Pb Class

示例数据

数据格式为 json 时,需以 json 字符串形式描述 schema。必须填写完整的数据,否则schema不准确。

*Pb 类定义

数据格式为 Pb 时,需要先定义 Pb 类,在框中中填写 Pb 的IDL定义,一次只支持一个 Pb 类的定义,示例如下:

syntax = "proto2";
package abase_test;
message AbaseTest {
    required int64 first_id = 1;
    required int64 latest_id = 2;
}

*Pb 类名

数据格式为 Pb 时,需要填写 PB Class 入口类名信息,
上方示例中的 PB Class 为 AbaseTest。

4.5 字段映射

数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
您可通过以下三种方式操作字段映射关系:

  • 自动添加:单击自动添加按钮,根据两端数据表信息,可以自动填充来源和目标的字段信息。
  • 手动添加:单击手动添加按钮,可以手动编辑来源和目标的字段信息,可以逐个添加。
  • 移动\删除字段:您也可以根据需要移动字段映射顺序或删除字段。

5 流式任务运行参数说明

5.1 运行参数说明

图片
流式任务运行参数与离线任务运行参数配置属性不同,下面将为您介绍流式任务运行参数配置说明:

配置项

说明

是否开启归档

默认否,这个选项只有在目标数据源是HDFSHive的场景下才生效,开启归档后,您需配置以下归档字段信息:

  • 归档字段类型:支持整型字符串两种类型。当选择字符串归档字段类型时,需另外设置归档字段格式
  • 归档字段:输入归档字段信息。
  • 归档字段格式:符合 Java Date Format 标准,例如
    • 若日期格式为: 20181017,则填写:yyyyMMdd
    • 若日期格式为: 2018-10-17,则填写:yyyy-MM-dd
    • 若日期格式为: 20181017 12:08:56,则填写:yyyyMMdd HH:mm:ss
    • 若日期格式为: 2018-10-17 12:08:56,则填写:yyyy-MM-dd HH:mm:ss
    • 若日期格式为: 2018-10-17T12:08:56,则填写:yyyy-MM-dd'T'HH:mm:ss
    • 若日期格式为: 2018-10-17T12:08:56.235,则填写:yyyy-MM-dd'T'HH:mm:ss.SSS

默认消费起始

选定消费 Kafka 的起始方式:

  • 最新、最老:设定从 Kafka 的latestearliest的 offset 进行启动。
  • 时间戳:可以选定一个时间戳,从这个时间戳进行启动。
  • 分区offset:需要填写一个map类型映射,key 中需要包含选定 topic 的所有分区。
    [
        {
            "partition":"partition_name1",
            "offset":100
        },
        {
            "partition":"partition_name2",
            "offset":100
        }
    ]
    

高级参数

读参数需要加上 job.comment. 前缀:
对于可视化通道任务,读参数需要加上 job.reader. 前缀,写参数需要加上 job.writer. 前缀,对于系统参数,使用时需要加上job.common的前缀。
图片

5.2 高级参数列表

参数名

描述

默认值

job.common.checkpoint_interval

checkpoint 的间隔,目前默认 15min 会进行一次 checkpoint。

900000,单位 ms

job.common.checkpoint_timeout

Checkpoint 超时时间。

300000,单位:milliseconds