ByteHouse 支持通过 Kafka 或其他 Kafka 协议兼容的消息队列流式传输数据。启动 Kafka 导入任务后,ByteHouse 将持续读取 Topic 消息,提供 exactly once 保障,数据消费后可立即访问。ByteHouse 也支持随时启停数据导入任务以节省资源,通过内部记录 offset 机制确保启停过程中的数据不丢失,保障数据一致性。本文介绍了如何使用 ByteHouse 流式导入 Kafka 数据。
使用限制
- 支持的 Kafka 版本:0.10 及以上。
- ByteHouse 引擎版本要求:2.3 及以上版本。您可以在 ByteHouse 控制台的租户管理>基本信息页面查看您当前的引擎版本。
前提条件
- 将 Kafka 数据迁移到 ByteHouse 前,请确保您使用的 Kafka 账号密码拥有以下数据权限:
- 列出主题(Topics)
- 列出消费者组(Consumer group)
- 消费消息(Consume message)
- 创建消费者以及消费者组(Consumers & Consumer groups)
- 获取 Kafka 数据源代理列表 IP 地址、用户名、密码。如果您需要通过内网访问火山引擎消息队列 Kafka 版,还需获取 Kafka 数据源所在的 VPC ID。
- 如果您通过内网访问火山引擎消息队列 Kafka 版,请在 Kafka 数据源所在的 VPC 安全组规则添加白名单:
100.64.0.0/10。配置详情请参见管理安全组规则。
创建任务
在 ByteHouse 控制台,单击数据加载页签,单击新建导入任务按钮,进入任务创建界面。

填写导入任务基本信息,自定义任务名称和任务描述。

选择数据源类型为 Kafka 数据流,并从下拉列表中选择已创建的数据源。

如果您尚未创建数据源,可单击连接新的数据源,新建并配置 Kafka 数据源,单击连接。

当前 ByteHouse 支持以下数据源导入数据,不同数据源需配置的参数项说明如下:
数据源 | 火山引擎 Kafka-同可用区
(私网访问) | 火山引擎 Kafka-同可用区
(公网访问) | 火山引擎 Kafka-不同可用区
(仅支持公网访问) | 非火山引擎 Kafka 服务
(仅支持公网访问) |
|---|
源名称 | 配置源名称,您可以自定义数据源名称。 |
火山内网模式 | 如果您对数据写入安全要求比较高,可勾选该选项。勾选后,请使用私网访问信息配置数据源连接信息。 | 不支持 | 不支持 | 不支持 |
Kafka 所在的 VPC ID | 启用火山内网模式后必填。配置为火山引擎队列消息 Kafka 实例的私有网络 VPC 地址。您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页的服务访问中查看 VPC ID。 | 不支持 | 不支持 | 不支持 |
Kafka 代理列表 IP 地址 | IP 地址由Kafka 公/私网络地址+端口号组成,格式为Kafka 网络 ID:端口号。多个拼接完的 Kafka 代理列表 IP 地址间通过逗号,分隔。填写时,请务必填写您使用的接入点所有 Broker 节点的 IP 地址。 |
您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页,下滑到服务访问模块中单击Broker 接入信息列的详情按钮,查看并复制 Broker 的私网 IP 地址。 | 您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页的服务访问中查看并复制公网的接入点 IP 地址。 | 您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页的服务访问中查看并复制公网的接入点 IP 地址。 | 您可登录使用的 Kafka 数据服务控制台查看并复制。 |
身份验证模式 | 当前 Kafka 数据源支持四种鉴权模式,包括 NONE 无鉴权、PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,并支持 SSL 加密,您可根据 Kafka 代理列表 IP 地址的端口号勾选,对应关系如下: - None 无鉴权:端口号为 9092。
- PLAIN、SCRAM-SHA-256、SCRAM-SHA-512:
- 如果未开启 SSL:端口号为 9093。
- 如果开启 SSL:端口号为 9095。
|
用户名 | 配置为 Kafka 服务的用户名。 |
密码 | 配置为 Kafka 服务的密码。 |
🎬 火山引擎 Kafka 使用内网模式访问可参考以下视频配置:
- 配置数据源的 Topic、消费组、格式等信息。

- Topic:选择要导入的 Topic。
- 消费者组:为已选择的 Topic 选择或新建一个消费者组。
- 格式:指定消费格式,当前支持 JSON_KAFKA 和 AVRO_KAFKA。
- 消费者数量:可选配置,您可按需指定消费者数量,请确保最大消费者数量需与分区数量相等。
- 选择导入的目标数据库和表。

- 定义 Schema 映射。设置数据源和目标表后,系统会自动填充 Schema 映射表,生成数据源与目标表的映射关系。ByteHouse 当前支持以下 Schema 映射配置操作,您可按需调整生成的 Schema 映射。

- 调整 Schema 映射逻辑
如果生成的 Schema 不符合预期或者您需要调整源列与目标列的映射关系,您可单击解析,系统将重新解析。 - 调整目标表
您可打开新的浏览器标签页,进入数据库页面并调整目标表,调整后,在流式导入页面中定义 Schema 映射模块单击刷新目标表,系统将自动读取目标表的调整并更新 Schema 映射表。 - 导入时实时删除唯一键表中的数据
如果您需要在导入时删除数据源表中的数据,且您使用的目标表是唯一键(Unique Key)表,可设置表达式(Expression)定义是否删除某条数据,详情请参加 实时删除数据。 - 导入时在目标表中生成数据库时间戳
如果您需要在导入时记录数据的导入时间,可使用表达式添加时间戳,便于后续基于时间戳进行数据查询、分析和追踪,详情请参考在目标表中生成数据库的时间戳。
- 配置导入任务的加载类型、max block size,指定计算组。

- 加载类型:当前仅支持增量写入。
- Max block size:指写入 block 的最大内存,输入值需在 65,536 和 1,966,080 之间。该参数直接影响消费内存使用,值越大所需内存越大。您可按需指定。
- 计算组:使用已有计算组加载任务,可享受更高的传输性能。
- 单击创建,即可生成导入任务。导入任务创建后,系统将跳转至任务详情页面,此时任务将处于暂停状态,您可单击开启,启动任务。

- 在启动 Kafka 作业弹窗中,确认需启动的任务名称和偏移量,确认无误后,单击确定,系统将执行数据加载任务。

如需修改偏移量,可单击配置,选择分区和偏移量。配置完成后,单击确定,系统将执行数据加载任务。

- 当前值:按照 Kafka 服务端存储的消费组 offset 位置继续消费。
- 最早:从 Kafka 服务端存储的最早的消息开始消费。
- 最新:从 Kafka 服务端存储的最新消息开始消费。
- 自定义:自定义 Kafka 对应消费组的消费位点,并从自定义的位点开始继续消费。
查看任务
在数据加载页面,您可以通过任务视图查看已创建的所有任务。

您也可以通过执行视图查看任务执行状态,查看任务执行日志和配置。

- 查看任务执行日志
单击目标任务行的日志按钮,查看当前任务的执行日志,您可单击复制或下载日志。

- 查看配置
单击目标任务行的配置按钮,查看当前任务的执行日志,您可单击复制或下载配置 JSON 文件,可用作通过 OpenAPI 导入数据的配置参考。

开启/停止任务
- 在任务列表中,单击源类型下拉列表,选择 Kafka。
- 单击目标任务行的开启按钮,启动当前导入任务。

- 单击任务名称,进入任务详情页面,可查看任务同步历史记录、消费状态和当前配置的 Schema 映射、日志、配置。

- 如果需要停止流式导入任务,可单击停止。
编辑任务
- 在任务列表中,单击源类型下拉列表,选择 Kafka。
- 单击目标任务行的编辑按钮,进入流式导入任务配置页面。

- 按需调整任务配置,编辑完成后,单击更新。
复制任务配置
如果您需要通过 OpenAPI 导入数据,可复制任务配置作为配置参考。
操作步骤:
- 在任务列表中,单击源类型下拉列表,选择 Kafka。
- 单击目标任务行的 ... 按钮,单击复制配置,系统将复制当前导入任务的配置。

删除任务
在任务列表中,单击目标任务行的 ... 按钮,单击删除任务,确认后即可删除当前任务。

高阶用法
在目标表中生成数据库的时间戳
需要在「数据导入」-> 「新建导入任务」 -> 「定义 Schema 映射」中配置如下红框圈中的列映射:
- _c0 是占位符。
- 数据类型需要选择 DateTime。
- Expression 的值是 ByteHouse 的 now() 表达式,该表达式会返回当前时间。
实时删除数据
当您使用的目标表是唯一键(Unique Key)表时,ByteHouse 支持将目标表中的列映射为 _delete_flag_ 列,并支持通过表达式定义删除条件。当导入任务中有满足删除条件的数据时,ByteHouse 将实时删除该条数据。
操作步骤
在「数据导入」-> 「新建导入任务」 -> 「定义 Schema 映射」中,打开行删除控制按钮,启用实时删除功能。启用后,系统将在映射表中自动添加 _delete_flag_ 列。

配置列映射。您可根据业务场景,指定目标表中的列,将其映射为 _delete_flag_ ,并为该列配置列映射表达式,当满足表达式条件时,系统将删除对应的数据。
如下图所示,将目标表中 的action 列映射为 _delete_flag_,并使用 multiIf 表达式定义 action 列。当 action 列的值为 1 时,唯一键对应的记录将会被删除;值为 0 时,表示写入数据。multiIf 表达式示例如下:
multiIf(action='delete',1,0)

相关参考:实时删除唯一键表数据最佳实践
ByteHouse 也支持通过 SQL 命令实时删除指定唯一键的数据,您可参考例3:实时删除指定唯一键的数据了解更多示例。