You need to enable JavaScript to run this app.
导航
流式导入
最近更新时间:2025.06.04 12:46:42首次发布时间:2021.10.22 10:06:12
我的收藏
有用
有用
无用
无用

ByteHouse 支持通过 Kafka 或其他 Kafka 协议兼容的消息队列流式传输数据。启动 Kafka 导入任务后,ByteHouse 将持续读取 Topic 消息,提供 exactly once 保障,数据消费后可立即访问。ByteHouse 也支持随时启停数据导入任务以节省资源,通过内部记录 offset 机制确保启停过程中的数据不丢失,保障数据一致性。本文介绍了如何使用 ByteHouse 流式导入 Kafka 数据。

使用限制
  • 支持的 Kafka 版本:0.10 及以上。
  • ByteHouse 引擎版本要求:2.3 及以上版本。您可以在 ByteHouse 控制台的租户管理>基本信息页面查看您当前的引擎版本。

前提条件
  • 将 Kafka 数据迁移到 ByteHouse 前,请确保您使用的 Kafka 账号密码拥有以下数据权限:
    • 列出主题(Topics)
    • 列出消费者组(Consumer group)
    • 消费消息(Consume message)
    • 创建消费者以及消费者组(Consumers & Consumer groups)
  • 获取 Kafka 数据源代理列表 IP 地址、用户名、密码。如果您需要通过内网访问火山引擎消息队列 Kafka 版,还需获取 Kafka 数据源所在的 VPC ID。
  • 如果您通过内网访问火山引擎消息队列 Kafka 版,请在 Kafka 数据源所在的 VPC 安全组规则添加白名单:100.64.0.0/10。配置详情请参见管理安全组规则

创建任务
  1. 在 ByteHouse 控制台,单击数据加载页签,单击新建导入任务按钮,进入任务创建界面。
    Image

  2. 填写导入任务基本信息,自定义任务名称和任务描述。
    Image

  3. 选择数据源类型为 Kafka 数据流,并从下拉列表中选择已创建的数据源。
    Image
    如果您尚未创建数据源,可单击连接新的数据源,新建并配置 Kafka 数据源,单击连接
    Image
    当前 ByteHouse 支持以下数据源导入数据,不同数据源需配置的参数项说明如下:

    数据源

    火山引擎 Kafka-同可用区
    (私网访问)

    火山引擎 Kafka-同可用区
    (公网访问)

    火山引擎 Kafka-不同可用区
    (仅支持公网访问)

    非火山引擎 Kafka 服务
    (仅支持公网访问)

    源名称

    配置源名称,您可以自定义数据源名称。

    火山内网模式

    如果您对数据写入安全要求比较高,可勾选该选项。勾选后,请使用私网访问信息配置数据源连接信息。

    注意

    如果您勾选了火山内网模式,通过内网访问火山引擎 Kafka,您还需要在 KAFKA 所在的 VPC 安全组规则添加白名单:100.64.0.0/10

    不支持

    不支持

    不支持

    Kafka 所在的 VPC ID

    启用火山内网模式后必填。配置为火山引擎队列消息 Kafka 实例的私有网络 VPC 地址。您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页服务访问中查看 VPC ID。

    不支持

    不支持

    不支持

    Kafka 代理列表 IP 地址

    IP 地址由Kafka 公/私网络地址+端口号组成,格式为Kafka 网络 ID:端口号。多个拼接完的 Kafka 代理列表 IP 地址间通过逗号,分隔。

    您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页的服务访问中查看并复制私网的接入点 IP 地址。

    您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页的服务访问中查看并复制公网的接入点 IP 地址。

    您可登录 火山引擎消息队列 Kafka 版控制台,在实例列表中单击目标实例,在实例详情页的服务访问中查看并复制公网的接入点 IP 地址。

    您可登录使用的 Kafka 数据服务控制台查看并复制。

    身份验证模式

    当前 Kafka 数据源支持四种鉴权模式,包括 NONE 无鉴权、PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,并支持 SSL 加密,您可按需勾选。
    选择不同鉴权模式时,Kafka 代理列表 IP 地址的配置要求不一致:

    • None 无鉴权:默认端口号为:9092。
    • PLAIN、SCRAM-SHA-256、SCRAM-SHA-512
      • 如果未开启 SSL:默认端口号为 9093。
      • 如果开启 SSL:默认端口号为 9095。

    用户名

    配置为 Kafka 服务的用户名。

    密码

    配置为 Kafka 服务的密码。

  4. 配置数据源的 Topic、消费组、格式等信息。
    Image

    • Topic:选择要导入的 Topic。
    • 消费者组:为已选择的 Topic 选择或新建一个消费者组。
    • 格式:指定消费格式,当前支持 JSON_KAFKA 和 AVRO_KAFKA。
    • 消费者数量:可选配置,您可按需指定消费者数量,请确保最大消费者数量需与分区数量相等。
  5. 选择导入的目标数据库和表。
    Image

  6. 定义 Schema 映射。设置数据源和目标表后,系统会自动填充 Schema 映射表,生成数据源与目标表的映射关系。ByteHouse 当前支持以下 Schema 映射配置操作,您可按需调整生成的 Schema 映射。
    Image

    • 调整 Schema 映射逻辑
      如果生成的 Schema 不符合预期或者您需要调整源列与目标列的映射关系,您可单击解析,系统将重新解析。
    • 调整目标表
      您可打开新的浏览器标签页,进入数据库页面并调整目标表,调整后,在流式导入页面中定义 Schema 映射模块单击刷新目标表,系统将自动读取目标表的调整并更新 Schema 映射表。
    • 导入时实时删除唯一键表中的数据
      如果您需要在导入时删除数据源表中的数据,且您使用的目标表是唯一键(Unique Key)表,可设置表达式(Expression)定义是否删除某条数据,详情请参加 实时删除数据
    • 导入时在目标表中生成数据库时间戳
      如果您需要在导入时记录数据的导入时间,可使用表达式添加时间戳,便于后续基于时间戳进行数据查询、分析和追踪,详情请参考在目标表中生成数据库的时间戳
  7. 配置导入任务的加载类型、max block size,指定计算组。
    Image

    • 加载类型:当前仅支持增量写入。
    • Max block size:指写入 block 的最大内存,输入值需在 65,536 和 1,966,080 之间。该参数直接影响消费内存使用,值越大所需内存越大。您可按需指定。
    • 计算组:使用已有计算组加载任务,可享受更高的传输性能。
  8. 单击创建,即可生成导入任务。导入任务创建后,系统将跳转至任务详情页面,此时任务将处于暂停状态,您可单击开启,启动任务。
    Image

  9. 在启动 Kafka 作业弹窗中,确认需启动的任务名称和偏移量,确认无误后,单击确定,系统将执行数据加载任务。
    Image
    如需修改偏移量,可单击配置,选择分区和偏移量。配置完成后,单击确定,系统将执行数据加载任务。
    Image

    • 当前值:按照 Kafka 服务端存储的消费组 offset 位置继续消费。
    • 最早:从 Kafka 服务端存储的最早的消息开始消费。
    • 最新:从 Kafka 服务端存储的最新消息开始消费。
    • 自定义:自定义 Kafka 对应消费组的消费位点,并从自定义的位点开始继续消费。

查看任务

在数据加载页面,您可以通过任务视图查看已创建的所有任务。
Image
您也可以通过执行视图查看任务执行状态,查看任务执行日志和配置。
Image

  • 查看任务执行日志
    单击目标任务行的日志按钮,查看当前任务的执行日志,您可单击复制或下载日志。
    Image
  • 查看配置
    单击目标任务行的配置按钮,查看当前任务的执行日志,您可单击复制或下载配置 JSON 文件,可用作通过 OpenAPI 导入数据的配置参考。
    Image

开启/停止任务
  1. 在任务列表中,单击源类型下拉列表,选择 Kafka。
  2. 单击目标任务行的开启按钮,启动当前导入任务。
    Image
  3. 单击任务名称,进入任务详情页面,可查看任务同步历史记录、消费状态和当前配置的 Schema 映射、日志、配置。
    Image
  4. 如果需要停止流式任务,可单击停止

编辑任务
  1. 在任务列表中,单击源类型下拉列表,选择 Kafka。
  2. 单击目标任务行的编辑按钮,进入流式导入任务配置页面。
    Image
  3. 按需调整任务配置,编辑完成后,单击更新

复制任务配置

如果您需要通过 OpenAPI 导入数据,可复制任务配置作为配置参考。
操作步骤

  1. 在任务列表中,单击源类型下拉列表,选择 Kafka。
  2. 单击目标任务行的 ... 按钮,单击复制配置,系统将复制当前导入任务的配置。
    Image

删除任务

在任务列表中,单击目标任务行的 ... 按钮,单击删除任务,确认后即可删除当前任务。
Image

高阶用法

在目标表中生成数据库的时间戳

需要在「数据导入」-> 「新建导入任务」 -> 「定义 Schema 映射」中配置如下红框圈中的列映射:

  • _c0 是占位符。
  • 数据类型需要选择 DateTime。
  • Expression 的值是 Bytehouse 的 now() 表达式,该表达式会返回当前时间。

实时删除数据

当您使用的目标表是唯一键(Unique Key)表时,ByteHouse 支持将目标表中的列映射为 _delete_flag_ 列,并支持通过表达式定义删除条件。当导入任务中有满足删除条件的数据时,ByteHouse 将实时删除该条数据。

操作步骤

  1. 在「数据导入」-> 「新建导入任务」 -> 「定义 Schema 映射」中,打开行删除控制按钮,启用实时删除功能。启用后,系统将在映射表中自动添加 _delete_flag_ 列。
    Image

  2. 配置列映射。您可根据业务场景,指定目标表中的列,将其映射为 _delete_flag_ ,并为该列配置列映射表达式,当满足表达式条件时,系统将删除对应的数据。
    如下图所示,将目标表中 的action 列映射为 _delete_flag_,并使用 multiIf 表达式定义 action 列。当 action 列的值为 1 时,唯一键对应的记录将会被删除;值为 0 时,表示写入数据。multiIf 表达式示例如下:

    multiIf(action='delete',1,0)
    

    Image

相关参考:实时删除唯一键表数据最佳实践

ByteHouse 也支持通过 SQL 命令实时删除指定唯一键的数据,您可参考例3:实时删除指定唯一键的数据了解更多示例。