You need to enable JavaScript to run this app.
导航
输出到 Kafka
最近更新时间:2025.09.17 11:20:49首次发布时间:2025.09.17 11:20:49
复制全文
我的收藏
有用
有用
无用
无用

数据输出到 Kafka算子支持将经过实时任务处理的数据,输出给下游系统。适用于需要将清洗后的数据实时输出到 Kafka 集群,以便进行下游的实时计算(如 Flink/Spark Streaming)、监控告警或数据归档的场景。

注意事项
  • 该算子当前支持将数据输出至外部Kafka;且仅支持在实时任务中使用,您需要先申请打开外部输出算子的功能开关后才可使用此算子。
  • 数据输出格式仅支持 JSON。
  • 在开始配置前,请确保你的目标 Kafka 集群已创建且已经建立数据连接,且数据连接已完成连通性测试,详情请参见Kafka数据接入

操作步骤

添加Kafka算子

  1. 点击「数据管理」,选择「可视化建模」,点击「新建」,选择「新建实时任务」。
    Image
  2. 按照实时任务中的步骤完成实时任务创建,在处理节点时,选择「输出>输出到Kafka」,并进行相关配置。
    Image

配置输出Kafka

点击「新建输出表」,配置输出的Kafka信息,具体参数配置说明如下,
Image

参数

配置说明

数据连接

可选择数据连接中的Kafka。

Topic

您可以在下拉框中选择后续数据需写入的Topic,支持模糊搜索;如果您在下拉框中没有找到对应的Topic,也支持在下拉框中直接输入Topic名称(64个字符内),后续数据写入时会自动为您创建对应Topic。

说明

通过输入Topic名称来直接新建topic时,您需保障对应 Kafka 开启了允许自动创建topic设置(auto.create.topics.enable=true)。

高级配置

通常场景下您无需进行高级配置,如果出现数据写入延迟、有扩容等诉求场景下,可修改此处的高级配置,支持根据需求设置数据集同步的运行参数,以保障同步成功或同步性能等。支持参数及其说明如下图所示。
Image

参数

说明

parallelism.default

默认并行度

taskmanager.memory.process.size

taskmanager的总进程内存大小

yarn.containers.vcores

每个yarn容器的虚拟核数

taskmanager.numberOfTaskSlots

taskmanager提供的插槽数

jobmanager.memory.process.size

jobmanager的总进程内存大小

yarn.appmaster.vcores

yarn中appmaster的虚拟核数

脏数据设置

通常您也无需进行脏数据的设置,如果出现数据写入报错、上游数据格式与预期不符等场景,需要进行数据即时监控时,您可打开脏数据采集开关并设置脏数据采集频率。
Image

参数

配置说明

是否开启脏数据采集

开启后,后续系统会自动采集脏数据,您可以在任务监控中查看脏数据详情,便于进行脏数据监控与分析,详情可参见下文的 任务监控 章节。

说明

如果您无需继续采集脏数据,可再次关闭此开关。

脏数据每秒采集条数

设置每秒采集脏数据的数量上限。

任务监控

完成可视化建模后,支持在任务管理页面监控脏数据与任务运行状态。
Image

配置监控规则

点击监控配置,在右侧弹出监控配置面板中,点击添加规则,可以添加相应的监控规则。
消费监控:支持设置输入数据的消费延迟监控、写入断流监控和写入上涨监控规则。例如图中,消费延迟监控规则可以设置为当每天8点到20点之间,连续20分组无数据时发送通知。
任务监控:支持设置该可视化建模任务的任务监控,包括脏数据数量监控和任务运行状态监控。例如图中,脏数据数量监控规则可以设置为当脏数据数量超过10000时发送通知。
监控通知:支持设置通知方式为发送邮件或邮件组给指定用户
Image

查看任务运行图表

在任务管理页面的运行&消费记录一栏下,可以查看生产消费统计、脏数据图表和脏数据的详细情况。
Image