You need to enable JavaScript to run this app.
导航
Flink 实时消费 ByteHouse CDW Binlog
最近更新时间:2025.08.22 15:27:41首次发布时间:2025.08.15 11:05:58
复制全文
我的收藏
有用
有用
无用
无用

本文将为您介绍如何通过火山引擎流式计算 Flink 版实时消费 ByteHouse 云数仓版(CDW)Binlog。

前提条件
  • 使用前,请参考订阅 ByteHouse CDW Binlog,了解 ByteHouse CDW Binlog 的功能概况,开启并按需配置 ByteHouse CDW Binlog。
  • Flink 消费 ByteHouse CDW Binlog 需要目标表以及系统表的读权限,请确保您具备相关权限。ByteHouse CDW 权限配置请参考数据权限管理
  • 请确保您使用的 ByteHouse CDW 引擎版本要求为 v2.3.1 及以上版本。您可以登录 ByteHouse 控制台,单击顶部租户管理页签,在基本信息中查看您使用的引擎版本是否符合要求。低于该版本的引擎启动 Binlog 功能会报错,如需升级请提交工单。
    Image

使用限制
  • 该功能为 Beta 功能,使用前,请联系提交工单或联系 ByteHouse 团队获取白名单权限。
  • 目前仅支持火山引擎流式计算 Flink 版消费 ByteHouse CDW Binlog。
  • 目前仅支持表级别的增量 Binlog 消费。

火山引擎流式计算 Flink 版支持通过 Flink Connector Driver for ByteHouse 云数仓版连接器实时消费 Binlog,具体使用方法如下。

准备工作

  1. 请点击下载 Flink Connector for ByteHouse 云数仓版 最新版本 Jar 文件,并参考 Flink Connector 使用说明中的准备工作章节安装该连接器。
    如果您已安装 Flink Connector,请确认版本为 v1.27.133_rc1 及以上。您可通过您使用的安装工具的配置日志,查找 Connect version: 字段对应的数值。
  2. Flink 消费 ByteHouse CDW Binlog 需连接至 ByteHouse CDW,当前支持通过 IAM 用户连接或数据库用户连接,您可提前获取网络域名、端口、API Key、数据库用户及密码等连接信息,详情请参考获取 ByteHouse 连接信息。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
    • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
    • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
      更多 IAM 用户和数据库用户的介绍请参见以下文档:
    • ByteHouse权限管控概述
    • 管理 IAM 用户
    • 管理数据库用户

源端会根据操作类型自动为每行数据设置准确的 Flink RowKind 类型,包括 INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER 等,以实现镜像同步表的数据,类似 MySQL 和 Postgres 的 CDC(变更数据捕获,ChangeDataCapture) 功能。
ByteHouse CDW 表开启 Binlog 后,在 Flink 中可使用如下 DDL 配置源表,实时消费 Binlog。使用时,您可以使用 ByteHouse IAM 用户或数据库用户连接。

CREATE TABLE bh_binlog_source (
  `id` INT,
  `a` BIGINT,
  `b` STRING,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'bytehouse-cdw',
  'binlog' = 'true',
  'jdbc.enable-gateway-connection' = 'true',
  'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
  'bytehouse.gateway.host' = '<your_bytehouse_cdw_host>',
  'bytehouse.gateway.api-token' = '<your_bytehouse_cdw_api_key>',
  'bytehouse.gateway.virtual-warehouse' = '<your_bytehouse_cdw_vw_id>',
  'database' = '<your_database_name>',
  'table-name' = '<your_table_name>',
  'scan.startup.mode' = 'earliest',
  'scan.max-split-size' = '5000',
  'scan.cdc.shard.split' = 'true'
); 
CREATE TEMPORARY TABLE print_sink WITH (
  'connector' = 'print'
) LIKE bh_binlog_source (EXCLUDING OPTIONS);
INSERT INTO print_sink SELECT * FROM bh_binlog_source;

参数说明

参数

是否必填

默认值

说明

CREATE TABLE ...

创建 binlog 写入源表,您可自定义表名称,并定义表列名及类型。

connector

指定要使用的连接器,需配置为bytehouse-cdw

jdbc.enable-gateway-connection

false

指定 JDBC 连接是否需要经过 ByteHouse 网关。

bytehouse.gateway.region

ByteHouse 网关区域,设置为 VOLCANO_PRIVATE

bytehouse.gateway.host

ByteHouse CDW 的网络域名,设置前请将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE
格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com,您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。

bytehouse.gateway.virtual-warehouse

控制台配置的默认计算组

通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。
您也可以指定其他计算组。您可登录 ByteHouse CDW 控制台,在计算组页签下,查看并复制计算组 ID。

bytehouse.gateway.api-token

使用 IAM 用户连接时必填

ByteHouse 网关 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。

bytehouse.gateway.account

使用数据库用户连接时必填

指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。

bytehouse.gateway.username

使用数据库用户连接时必填

ByteHouse 数据库账号用户名。可通过 ByteHouse 控制台 > 权限管理 > 用户路径,查看并复制数据库用户名

bytehouse.gateway.paasword

使用数据库用户连接时必填

指 ByteHouse 数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

需连接的 ByteHouse 云数仓版数据库的名称。

table-name

需连接 ByteHouse 云数仓版表的名称。

binlog

false

是否开启读取 Binlog 信息。设置为 true 时,表示开启实时消费 Binlog 功能,当前仅支持设置为 true。

scan.startup.mode

LATEST

设置起始消费位置。目前仅支持增量模式,可选值:
• earliest:从最早可用的 BSN 开始。
• latest:从最新 BSN 开始。
• specific:从 scan.startup.specific-offset.pos 指定的 BSN 开始。

scan.max-split-size

10000

设置批量读取 Binlog 的数据行数。

scan.cdc.shard.split

true

设置是否分 shard 读取,true 为是,false 为否,默认为 true。

scan.cdc.max-retries

3

设置消费 Binlog 数据出错后的重试次数。

scan.startup.specific-offset.pos

scan.startup.mode设置为 SPECIFIC 时,可以指定 BSN(Binlog Sequence Number)消费。

CREATE TEMPORARY TABLE ...

创建临时表,该表结构与写入源 bh_binlog_source 一致,但其 connector 为 print,表示数据会直接打印到 Stdout(标准输出)日志。

  • connector:指定要使用的连接器,需配置为 print。Print 连接器是一个系统内置的调试专用结果表,可以将作业结果输出并打印到 Stdout 日志中。

    注意

    请不要在正式环境的任务中使用 Print 调试连接器,会影响任务性能,存在写满磁盘的风险。

  • LIKE:指定为写入源表的名称,将集成源表的结构。
  • EXCLUDING OPTIONS:表示不继承源表的连接参数。

INSERT INTO ... SELECT * FROM ...

创建数据同步任务,将写入源 bh_binlog_source 读取的数据全量同步至 print_sink 表,最终通过 print 连接器输出到 Stdout 日志中。

下面是通过 Flink DataStream API 消费 ByteHouse CDW Binlog 的构建示例。使用时,您可以使用 ByteHouse IAM 用户或数据库用户连接。

import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.CnchCdcSource;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.deserialization.RowDataDeserializationSchema;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.table.StartupOptions;
import com.bytedance.bytehouse.flink.connector.cnch.ByteHouseCdwClient;
import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchCdcSourceBuilder;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;

/**
 * 以下展示了 ByteHouse CDW CDC 连接器通过读取 binlog 将数据写入 ByteHouse CDW 表的使用方法。
 */
public class CnchCdcSourceExample {

  public static void main(String[] args) throws Exception {
    final String region = "VOLCANO_PRIVATE";
    final int port = 19000;
    final String host = args[0];
    final String apiToken = args[1];
    final String virtualWarehouse = args[2];
    final String database = args[3];
    final String tableName = args[4];
    final String tableNameSink = args[5];
    final String hostSink = args[6];
    final String apiTokenSink = args[7];

    // 创建执行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    List<Column> columns =
        Arrays.asList(
            Column.physical("id", DataTypes.BIGINT()), Column.physical("c", DataTypes.STRING()));
    // 构建 ByteHouse CDW CDC 源
CnchCdcSource<RowData> cdcSource =
    // 设置 CnchCdcSourceBuilder
    new CnchCdcSourceBuilder.CdcSource<RowData>(database, tableName)
        .withGatewayConnection(region, host, port)
        .withBinlog(true)
        .withGatewayApiToken(apiToken)
        .withGatewayVirtualWarehouse(virtualWarehouse)
        .withSchema(columns)
        .withStartupModeOption(StartupOptions.earliest()) // 设置起始位置为earliest
        .withDeserializationSchema(new RowDataDeserializationSchema())
        .withScanSplitSize(10000)    // 设置单次获取的 binlog 最大批次大小,单位为行
        .withShardSplit(true)        // 开启分 shard 读
        .build();

    // 构建 ByteHouse CDW CDC datastream(数据流)      
    DataStream<RowData> dataStream =
    env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource");
    dataStream.print();
      // 触发执行
      env.execute(CnchSinkDataStreamExample.class.getName());
    }
  }
}

接口说明

接口

是否必填

说明

构建 ByteHouse CDW CDC 源

CnchCdcSource<RowData> cdcSource

声明一个CnchCdcSource 数据源,泛型为RowData,命名为cdcSource,该名称支持自定义。

  • <RowData>:Java 泛型,用于指定 CnchCdcSource 生成的数据类型。设置支持配置为以下值:
    • RowData:推荐使用该值,Flink Table API/SQL内部使用的二进制优化的数据结构,适用于生产环境。
    • ByteHouseRecord:ByteHouse 内部定义的数据结构,可在需要访问 ByteHouse 特有元数据时使用。
    • GenericRowData:RowData 接口的基础实现,但内存效率低,仅适用于开发测试环境。
  • cdcSource:对象名称,支持自定义。

new CnchCdcSourceBuilder.CcdSource<RowData>(database, tableName)

设置需连接的 ByteHouse 的数据库和表名。database / tableName 为必填;其余参数皆有默认值,可按需覆盖。

  • database:需要连接的 ByteHouse 云数仓版数据库的名称。
  • tableName:需要连接的 ByteHouse 云数仓版表的名称。

.withGatewayConnection(region, host, port)

ByteHouse CDW 网关连接信息。

  • region:ByteHouse CDW 网关区域,该值固定设置为 VOLCANO_PRIVATE
  • host:ByteHouse CDW 网关的网络域名,格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。
  • port:ByteHouse CDW 网关的端口号,固定配置为 19000。

.withBinlog(true)

是否读取 Binlog 信息。设置为 true 时,表示开启实时消费 Binlog 功能,当前仅支持设置为 true。

.withGatewayApiToken(apiToken)

使用 IAM 用户连接时必填

设置为 ByteHouse CDW 的 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。

.withGatewayAccount(accountId, username, password)

使用数据库用户连接时必填

设置为 ByteHouse CDW 数据库用户的账号和密码:

  • accountId:指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
  • username:指数据库用户名称。您可通过 ByteHouse 控制台>权限管理>用户路径,查看并复制数据库用户名。
  • password:指数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

.withGatewayVirtualWarehouse(virtualWarehouse)

通过 ByteHouse 网关进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。您也可以指定其他计算组。您可登录 ByteHouse CDW 控制台,在计算组页签下,查看并复制计算组 ID。

.withSchema(columns)

设置读取的列。

.withStartupModeOption(StartupOptions.earliest())

设置从某个 BSN 开始消费 Binlog,目前只支持增量消费。可以设置最早、最晚、或者指定位置,对应的参数值分别为 earliest、latest、specific。

.withDeserializationSchema(new RowDataDeserializationSchema())

定义如何将二进制 Binlog 数据解析为 Flink 内部对象,设置时需确保与参数 CnchCdcSourceBuilder.CdcSource<T> 中设置的 <T> 类型对应,支持设置为以下三种类型:

  • RowDataDeserializationSchema:推荐值,与 对应。
  • SimpleDeserializationSchema :与 对应
  • GenericRowDataDeserializationSchema:与 对应。

withScanSplitSize(10000)

设置每次批量读取 Binlog 的数据行数。

.withShardSplit(true)

设置是否分 shard 读取,true 为是,false 为否,默认为 true。

withScanRetries(retrys: 1)

设置消费 Binlog 数据出错后的重试次数。

.build()

返回一个 CnchCdcSource 实例,该示例由对应的 CnchCdcSourceBuilder 构建。

构建 ByteHouse CDW CDC 数据流

DataStream dataStream

构建 ByteHouse CDW CDC 数据流。DataStream 后定义的泛型需与上述数据源定义的类型相同。

env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource")

将 CDC 源转换为 Flink 数据流。

  • cdcSource:上述构建的数据源名称。
  • WatermarkStrategy:推荐设置为 noWatermarks()
  • testSource:Flink 中显示的数据源名称,您可根据实际情况填写。