You need to enable JavaScript to run this app.
ByteHouse云数仓版

ByteHouse云数仓版

复制全文
数据订阅(Binlog)
Flink 实时消费 ByteHouse CDW Binlog
复制全文
Flink 实时消费 ByteHouse CDW Binlog

本文将为您介绍如何通过火山引擎流式计算 Flink 版实时消费 ByteHouse 云数仓版(CDW)Binlog。

前提条件
  • 使用前,请参考订阅 ByteHouse CDW Binlog,了解 ByteHouse CDW Binlog 的功能概况,开启并按需配置 ByteHouse CDW Binlog。
  • Flink 消费 ByteHouse CDW Binlog 需要目标表以及系统表的读权限,请确保您具备相关权限。ByteHouse CDW 权限配置请参考数据权限管理
  • 请确保您使用的 ByteHouse CDW 引擎版本要求为 v2.3.1 及以上版本。您可以登录 ByteHouse 控制台,单击顶部租户管理页签,在基本信息中查看您使用的引擎版本是否符合要求。低于该版本的引擎启动 Binlog 功能会报错,如需升级请提交工单。
    Image

使用限制
  • 该功能为 Beta 功能,使用前,请联系提交工单或联系 ByteHouse 团队获取白名单权限。
  • 目前仅支持火山引擎流式计算 Flink 版消费 ByteHouse CDW Binlog。
  • 目前仅支持表级别的增量 Binlog 消费。

火山引擎流式计算 Flink 版支持通过 Flink Connector Driver for ByteHouse 云数仓版连接器实时消费 Binlog,具体使用方法如下。

准备工作

  1. 当前火山 Flink 内置的 ByteHouse connector 版本尚不支持 Binlog,请通过添加依赖文件的方式使用以下提供的 connector,操作详情请参见Maven 依赖

    Flink 版本

    驱动程序

    1.16

    flink-sql-connector-bytehouse-cdw_2.12-1.28.17-1.16.jar

    1.17

    flink-sql-connector-bytehouse-cdw_2.12-1.28.17-1.17.jar

  2. Flink 消费 ByteHouse CDW Binlog 需连接至 ByteHouse CDW,当前支持通过 IAM 用户连接或数据库用户连接,您可提前获取网络域名、端口、API Key、数据库用户及密码等连接信息,详情请参考获取 ByteHouse 连接信息。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

    • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
    • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
      更多 IAM 用户和数据库用户的介绍请参见以下文档:
    • ByteHouse权限管控概述
    • 管理 IAM 用户
    • 管理数据库用户

源端会根据操作类型自动为每行数据设置准确的 Flink RowKind 类型,包括 INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER 等,以实现镜像同步表的数据,类似 MySQL 和 Postgres 的 CDC(变更数据捕获,ChangeDataCapture) 功能。
ByteHouse CDW 表开启 Binlog 后,在 Flink 中可使用如下 DDL 配置源表,实时消费 Binlog。使用时,您可以使用 ByteHouse IAM 用户或数据库用户连接。

CREATE TABLE bh_binlog_source (
  `id` INT,
  `a` BIGINT,
  `b` STRING,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'bytehouse-cdw',
  'binlog' = 'true',
  'jdbc.enable-gateway-connection' = 'true',
  'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
  'bytehouse.gateway.host' = '<your_bytehouse_cdw_host>',
  'bytehouse.gateway.api-token' = '<your_bytehouse_cdw_api_key>',
  'bytehouse.gateway.virtual-warehouse' = '<your_bytehouse_cdw_vw_id>',
  'database' = '<your_database_name>',
  'table-name' = '<your_table_name>',
  'scan.startup.mode' = 'earliest',
  'scan.max-split-size' = '5000',
  'scan.cdc.shard.split' = 'true'
); 
CREATE TEMPORARY TABLE print_sink WITH (
  'connector' = 'print'
) LIKE bh_binlog_source (EXCLUDING OPTIONS);
INSERT INTO print_sink SELECT * FROM bh_binlog_source;

参数说明

参数

是否必填

默认值

说明

CREATE TABLE ...

创建 binlog 写入源表,您可自定义表名称,并定义表列名及类型。

connector

指定要使用的连接器,需配置为bytehouse-cdw

jdbc.enable-gateway-connection

false

指定 JDBC 连接是否需要经过 ByteHouse 网关。

bytehouse.gateway.region

ByteHouse 网关区域,设置为 VOLCANO_PRIVATE

bytehouse.gateway.host

ByteHouse CDW 的网络域名,设置前请将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE
格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com,您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。

bytehouse.gateway.virtual-warehouse

控制台配置的默认计算组

通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。
您也可以指定其他计算组。您可登录 ByteHouse CDW 控制台,在计算组页签下,查看并复制计算组 ID。

bytehouse.gateway.api-token

使用 IAM 用户连接时必填

ByteHouse 网关 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。

bytehouse.gateway.account

使用数据库用户连接时必填

指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。

bytehouse.gateway.username

使用数据库用户连接时必填

ByteHouse 数据库账号用户名。可通过 ByteHouse 控制台 > 权限管理 > 用户路径,查看并复制数据库用户名

bytehouse.gateway.paasword

使用数据库用户连接时必填

指 ByteHouse 数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

需连接的 ByteHouse 云数仓版数据库的名称。

table-name

需连接 ByteHouse 云数仓版表的名称。

binlog

false

是否开启读取 Binlog 信息。设置为 true 时,表示开启实时消费 Binlog 功能,当前仅支持设置为 true。

scan.startup.mode

LATEST

设置起始消费位置。目前仅支持增量模式,可选值:
• earliest:从最早可用的 BSN 开始。
• latest:从最新 BSN 开始。
• specific:从 scan.startup.specific-offset.pos 指定的 BSN 开始。

scan.max-split-size

10000

设置批量读取 Binlog 的数据行数。

scan.cdc.shard.split

true

设置是否分 shard 读取,true 为是,false 为否,默认为 true。

scan.cdc.max-retries

3

设置消费 Binlog 数据出错后的重试次数。

scan.startup.specific-offset.pos

scan.startup.mode设置为 SPECIFIC 时,可以指定 BSN(Binlog Sequence Number)消费。

CREATE TEMPORARY TABLE ...

创建临时表,该表结构与写入源 bh_binlog_source 一致,但其 connector 为 print,表示数据会直接打印到 Stdout(标准输出)日志。

  • connector:指定要使用的连接器,需配置为 print。Print 连接器是一个系统内置的调试专用结果表,可以将作业结果输出并打印到 Stdout 日志中。

    注意

    请不要在正式环境的任务中使用 Print 调试连接器,会影响任务性能,存在写满磁盘的风险。

  • LIKE:指定为写入源表的名称,将集成源表的结构。
  • EXCLUDING OPTIONS:表示不继承源表的连接参数。

INSERT INTO ... SELECT * FROM ...

创建数据同步任务,将写入源 bh_binlog_source 读取的数据全量同步至 print_sink 表,最终通过 print 连接器输出到 Stdout 日志中。

下面是通过 Flink DataStream API 消费 ByteHouse CDW Binlog 的构建示例。使用时,您可以使用 ByteHouse IAM 用户或数据库用户连接。

import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.CnchCdcSource;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.deserialization.RowDataDeserializationSchema;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.table.StartupOptions;
import com.bytedance.bytehouse.flink.connector.cnch.ByteHouseCdwClient;
import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchCdcSourceBuilder;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;

/**
 * 以下展示了 ByteHouse CDW CDC 连接器通过读取 binlog 将数据写入 ByteHouse CDW 表的使用方法。
 */
public class CnchCdcSourceExample {

  public static void main(String[] args) throws Exception {
    final String region = "VOLCANO_PRIVATE";
    final int port = 19000;
    final String host = args[0];
    final String apiToken = args[1];
    final String virtualWarehouse = args[2];
    final String database = args[3];
    final String tableName = args[4];
    final String tableNameSink = args[5];
    final String hostSink = args[6];
    final String apiTokenSink = args[7];

    // 创建执行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    List<Column> columns =
        Arrays.asList(
            Column.physical("id", DataTypes.BIGINT()), Column.physical("c", DataTypes.STRING()));
    // 构建 ByteHouse CDW CDC 源
CnchCdcSource<RowData> cdcSource =
    // 设置 CnchCdcSourceBuilder
    new CnchCdcSourceBuilder.CdcSource<RowData>(database, tableName)
        .withGatewayConnection(region, host, port)
        .withBinlog(true)
        .withGatewayApiToken(apiToken)
        .withGatewayVirtualWarehouse(virtualWarehouse)
        .withSchema(columns)
        .withStartupModeOption(StartupOptions.earliest()) // 设置起始位置为earliest
        .withDeserializationSchema(new RowDataDeserializationSchema())
        .withScanSplitSize(10000)    // 设置单次获取的 binlog 最大批次大小,单位为行
        .withShardSplit(true)        // 开启分 shard 读
        .build();

    // 构建 ByteHouse CDW CDC datastream(数据流)      
    DataStream<RowData> dataStream =
    env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource");
    dataStream.print();
      // 触发执行
      env.execute(CnchSinkDataStreamExample.class.getName());
    }
  }
}

接口说明

接口

是否必填

说明

构建 ByteHouse CDW CDC 源

CnchCdcSource<RowData> cdcSource

声明一个CnchCdcSource 数据源,泛型为RowData,命名为cdcSource,该名称支持自定义。

  • <RowData>:Java 泛型,用于指定 CnchCdcSource 生成的数据类型。设置支持配置为以下值:
    • RowData:推荐使用该值,Flink Table API/SQL内部使用的二进制优化的数据结构,适用于生产环境。
    • ByteHouseRecord:ByteHouse 内部定义的数据结构,可在需要访问 ByteHouse 特有元数据时使用。
    • GenericRowData:RowData 接口的基础实现,但内存效率低,仅适用于开发测试环境。
  • cdcSource:对象名称,支持自定义。

new CnchCdcSourceBuilder.CcdSource<RowData>(database, tableName)

设置需连接的 ByteHouse 的数据库和表名。database / tableName 为必填;其余参数皆有默认值,可按需覆盖。

  • database:需要连接的 ByteHouse 云数仓版数据库的名称。
  • tableName:需要连接的 ByteHouse 云数仓版表的名称。

.withGatewayConnection(region, host, port)

ByteHouse CDW 网关连接信息。

  • region:ByteHouse CDW 网关区域,该值固定设置为 VOLCANO_PRIVATE
  • host:ByteHouse CDW 网关的网络域名,格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。
  • port:ByteHouse CDW 网关的端口号,固定配置为 19000。

.withBinlog(true)

是否读取 Binlog 信息。设置为 true 时,表示开启实时消费 Binlog 功能,当前仅支持设置为 true。

.withGatewayApiToken(apiToken)

使用 IAM 用户连接时必填

设置为 ByteHouse CDW 的 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。

.withGatewayAccount(accountId, username, password)

使用数据库用户连接时必填

设置为 ByteHouse CDW 数据库用户的账号和密码:

  • accountId:指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
  • username:指数据库用户名称。您可通过 ByteHouse 控制台>权限管理>用户路径,查看并复制数据库用户名。
  • password:指数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

.withGatewayVirtualWarehouse(virtualWarehouse)

通过 ByteHouse 网关进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。您也可以指定其他计算组。您可登录 ByteHouse CDW 控制台,在计算组页签下,查看并复制计算组 ID。

.withSchema(columns)

设置读取的列。

.withStartupModeOption(StartupOptions.earliest())

设置从某个 BSN 开始消费 Binlog,目前只支持增量消费。可以设置最早、最晚、或者指定位置,对应的参数值分别为 earliest、latest、specific。

.withDeserializationSchema(new RowDataDeserializationSchema())

定义如何将二进制 Binlog 数据解析为 Flink 内部对象,设置时需确保与参数 CnchCdcSourceBuilder.CdcSource<T> 中设置的 <T> 类型对应,支持设置为以下三种类型:

  • RowDataDeserializationSchema:推荐值,与 对应。
  • SimpleDeserializationSchema :与 对应
  • GenericRowDataDeserializationSchema:与 对应。

withScanSplitSize(10000)

设置每次批量读取 Binlog 的数据行数。

.withShardSplit(true)

设置是否分 shard 读取,true 为是,false 为否,默认为 true。

withScanRetries(retrys: 1)

设置消费 Binlog 数据出错后的重试次数。

.build()

返回一个 CnchCdcSource 实例,该示例由对应的 CnchCdcSourceBuilder 构建。

构建 ByteHouse CDW CDC 数据流

DataStream dataStream

构建 ByteHouse CDW CDC 数据流。DataStream 后定义的泛型需与上述数据源定义的类型相同。

env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource")

将 CDC 源转换为 Flink 数据流。

  • cdcSource:上述构建的数据源名称。
  • WatermarkStrategy:推荐设置为 noWatermarks()
  • testSource:Flink 中显示的数据源名称,您可根据实际情况填写。
最近更新时间:2025.12.22 17:38:01
这个页面对您有帮助吗?
有用
有用
无用
无用