将Kafka数据实时同步至Elasticsearch、HDFS、Hive、LAS、StarRocks等目标端数据源,实现Kafka数据源与各目标端数据源之间的数据实时传输。同时支持TIDB数据源中的增量数据,通过Kafka2LAS通道,将增量数据写入LAS数据表中。下文以Kafka-LAS通道任务配置为例,为您介绍流式集成任务配置。
LAS、Kafka数据源可在数据开发 > 数据源管理中进行配置,相关操作可参见“注册数据源”。
任务创建完成后,直接进入任务配置页面,依次设置以下任务信息。
选择数据源。
数据来源信息
数据来源端选择Kafka,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
|---|---|
*数据源类型 | 下拉选择Kafka数据源类型。 |
*数据源名称 | 已在数据源管理界面注册的Kafka数据源,下拉可选。 |
*Topic 名称 | Kafka处理的消息源的不同分类主题名称,下拉可选数据源下对应的Topic名称。 |
*数据格式 | 支持JSON、Pb、Debezium Json、OceanBase SharePlex Json、Canal Json,下拉可选,默认为JSON格式,您可根据实际数据情况进行数据格式选择。 |
*示例数据 | 数据格式为json、Debezium Json、OceanBase SharePlex Json、Canal Json时,需要以json字符串形式描述schema。 必须填写完整的数据,否则schema不准确,例如:{"uid":123, "ut":12, "user_name": "xxx"}。 |
*Pb类定义 | 数据格式为Pb时,需要先定义Pb类,一次只支持一个Pb类的定义,示例如下:
|
*PB Class | 数据格式为Pb时,需要填写PB Class信息:示例如下:
示例中的PB Class为AbaseTest。 |
数据写入目标配置
数据来源端选择LAS,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
|---|---|
*目标类型 | 数据去向目标类型选择LAS。 |
*数据源名称 | 已在数据源管理界面注册的LAS数据源,下拉可选。 |
*数据表 | 对应数据源的Schema下所创建的LAS表,下拉可选。 说明
|
*分区频率 | 支持选择天级、小时级两个频率,数据会跟随LAS分区生成时间产出,非实时写入。 |
*分区设置 | 分区字段、类型、内容会从LAS表自动获取并判断。 输出分区示例:date=20220930/hour=16。 |
设置字段映射信息。
字段为一一映射关系,即将源表字段信息,同步到同一行的目标字段信息中,支持自动添加、手动添加,以及编辑、删除、顺序调整等操作。
说明

高级参数设置(可选填)
您可在高级参数中设置归档、消费起始或任务运行高级参数等信息。
参数 | 说明 |
|---|---|
是否开启归档 | 可选择是否进行归档。 |
归档字段类型 | 支持整型和字符串两种类型。 |
归档字段 | 输入归档字段信息。 |
归档字段格式 | 符合Java Date Format标准,例如:
|
高级参数 | 可选择是否要进行高级参数设置,默认关闭。
说明 TIDB数据源若在Kafka存在多个表时,需在高级参数中添加job.writer.target-table参数,指定TIDB数据表名称信息,来进行导入。 |
参数设置
单击右侧侧边栏的参数设置按钮,进入配置流式集成任务运行队列、资源、Flink 运行参数等参数信息。
其中流式集成任务支持通过设置 Flink 运行参数,来实现任务失败自动重试的能力。在 Flink 运行参数中,设置 studio.restart.attempts=N参数,即可实现任务失败自动重试,其中“N”为自动重试次数,您可按需进行设置。
更多参数说明详见“参数设置”。
单击任务配置上方导航栏中的保存并提交上线按钮,完成任务上线操作。上线说明详见“提交上线”。
任务提交发布到流式运维中心后,您便可进行后续的运维操作。详见“流式任务运维”。