You need to enable JavaScript to run this app.
导航
kafka sink 配置实践
最近更新时间:2025.07.10 15:39:47首次发布时间:2024.11.08 14:46:58
复制全文
我的收藏
有用
有用
无用
无用

本文介绍了如何借助北向通道将边缘节点采集的设备属性数据上报到第三方 Kafka 消息队列。

准备工作

  1. 准备一个边缘节点,并完成模拟数据源

    注意

    边缘智能软件版本必须在 v2.7.0 及以上。

  2. 准备一个 Kafka 服务器。
    您可以使用在云服务商平台开通的 Kafka 消息队列实例,也可以使用 Docker 自行部署一个 Kafka 实例。
    在本文中,我们使用一个在火山引擎消息队列 Kafka 版中创建的实例作为示例。在示例实例中,创建了如下的操作用户和 Topic:

    • 用户信息:
      用户名:north-demo,密码类型:Plain
      Image
    • Topic 信息:
      Topic 名称:north
      Image
  3. 参照创建北向通道,创建一个北向通道。
    按以下方式配置数据源(source)和数据目的地(sink):

    类型

    配置项

    说明

    数据源配置

    节点

    选择您的边缘节点。

    设备模板

    选择 全部

    驱动模板

    选择 全部

    设备实例

    选择您在模拟数据源操作中,接入到边缘节点的虚拟设备。

    sink 节点配置

    sink 类型

    选择 kafka

    参数配置

    • brokers:填写 Kafka broker 的 URL 列表。多个 URL 间以半角逗号(,)分隔。
    • topic:填写转发消息的 Topic。
    • saslAuthType:设置 Kafka 的 SASL 认证类型。该参数有以下取值:
      • none(默认)
      • plain
      • scram-256
      • scram-512
    • saslUserName & saslPassword
      • 如果 SASL 认证类型为 none,无需设置这两个参数。
      • 如果 SASL 认证类型为 plain,分别设置这两个参数为 Kafka 用户名和密码。
      • 如果 SASL 认证类型为 scram-256/scram-512,且 Kafka 实例开启了 SSL 安全认证,则除了设置这两个参数,还需要将 insecureSkipVerify 设置为 true

    Image

完成以上配置后,单击 连通性测试。确保连通性测试成功,然后单击 保存

验证数据上报结果

准备工作完成后,北向通道将被部署到边缘节点。当北向通道正常运行时,虚拟设备的随机属性数据将被上报到 Kafka 消息队列。您可以按以下方式进行验证:

  1. 访问火山引擎 Kafka 实例,进入实例详情。
  2. 消息查询 标签页,选择 按时间查询、选择目标 Topic 和 全部 分区。
    您可以查看最近上报的消息。

Image

配置示例

自定义数据转发格式

默认情况下,边缘节点采集到的设备属性数据会以原始格式转发到目标 Kafka broker。如果目标 Kafka broker 需要特定的数据格式,您可以相应地修改数据转发格式。

  • 边缘智能提供了 4 种预置的数据格式模板,包括 json-simplest、json-influx、csv、yaml。通过为 kafka sink 配置相应的数据格式模板,采集到的原始数据会被转换成指定格式,然后转发至 Kafka broker。
  • 如果预置的数据格式模板无法满足需求,您还可以自定义数据格式模板。

要修改数据转发格式,按以下方式配置:

  • 方式1:使用预置数据格式模板
    1. 编辑北向通道。
    2. 设置 数据格式,选择一种预置的数据格式模板。
      要查看数据预览效果,您可以选择一个设备模板。对应数据格式在所选设备模板上的预览将被展示。
    3. 完成 连通性测试,然后单击 保存
  • 方式2:自定义数据格式模板
    1. 编辑北向通道。
    2. 参数配置 中增加 dataTemplate 参数,并定义该参数的值为数据格式转换逻辑。

      说明

      如需编写数据格式转换逻辑,可以参考设备数据采集 - 原始数据格式以及使用 Golang template。您也可以提交工单联系我们,以获取帮助。

      Image
    3. 完成 连通性测试,然后单击 保存

修改数据格式后,您可前往 Kafka 进行验证。在最新接收到的数据中,如果数据格式符合预期,表示数据转发格式已经修改成功。
Image

kafka sink 参数说明

下表介绍了 kafka sink 支持的所有参数。

参数

数据类型

是否必选

说明

示例值

brokers

String

Kafka Broker 的 URL 列表。多个 URL 间以半角逗号(,)分隔。

broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092

topic

String

发送消息到的目标 Topic。

north

saslAuthType

String

Kafka 的 SASL 认证类型。取值:none(默认)、plainscram-256scram-512

plain

saslUserName

String

SASL 认证的用户名。

admin

saslPassword

String

SASL 认证的密码。

123456

batchSize

Integer

一个消息批次包含消息的数量。默认值为 1

1

maxAttempts

Integer

消息发送失败后进行重试的次数。默认值为 1

1

headers

Object

向 Kafka 上报的消息中添加的自定义标头信息。

{"new-header" = "vei","Show-Headers" = "true"}

key

String

向 Kafka 上报的消息中添加的自定义 Key 信息。

message-key

insecureSkipVerify

Boolean

是否跳过证书验证。默认值为 false

false

enableCache

Boolean

是否启用本地缓存。默认值为 fasle
启用本地缓存后,不能及时发送的数据将缓存到本地,稍后继续发送;关闭本地缓存后,不能及时发送的数据将被丢弃。

false

cleanCacheAtStop

Boolean

北向通道停止处理时是否清除缓存。默认值为 true

true

memoryCacheThreshold

Integer

本地内存中允许缓存的最大消息数量。默认值为 1024

1024

maxDiskCache

Integer

本地磁盘中允许缓存的最大消息数量。默认值为 1024000

1024000

bufferPageSize

Integer

缓冲区每页中包含的消息数量,防止频繁 IO。默认值为 256

256

ResendInterval

Integer

重新发送缓存消息的时间间隔(毫秒)。默认值为 1000

1000