本文将为您介绍如何通过火山引擎流式计算 Flink 版实时消费 ByteHouse 云数仓版(CDW)Binlog。
前提条件
- 使用前,请参考订阅 ByteHouse CDW Binlog,了解 ByteHouse CDW Binlog 的功能概况,开启并按需配置 ByteHouse CDW Binlog。
- Flink 消费 ByteHouse CDW Binlog 需要目标表以及系统表的读权限,请确保您具备相关权限。ByteHouse CDW 权限配置请参考数据权限管理。
- 请确保您使用的 ByteHouse CDW 引擎版本要求为 v2.3.1 及以上版本。您可以登录 ByteHouse 控制台,单击顶部租户管理页签,在基本信息中查看您使用的引擎版本是否符合要求。低于该版本的引擎启动 Binlog 功能会报错,如需升级请提交工单。

使用限制
- 该功能为 Beta 功能,使用前,请联系提交工单或联系 ByteHouse 团队获取白名单权限。
- 目前仅支持火山引擎流式计算 Flink 版消费 ByteHouse CDW Binlog。
- 目前仅支持表级别的增量 Binlog 消费。
Flink 实时消费 Binlog
火山引擎流式计算 Flink 版支持通过 Flink Connector Driver for ByteHouse 云数仓版连接器实时消费 Binlog,具体使用方法如下。
准备工作
- 请点击下载 Flink Connector for ByteHouse 云数仓版 最新版本 Jar 文件,并参考 Flink Connector 使用说明中的准备工作章节安装该连接器。
如果您已安装 Flink Connector,请确认版本为 v1.27.133_rc1 及以上。您可通过您使用的安装工具的配置日志,查找 Connect version: 字段对应的数值。 - Flink 消费 ByteHouse CDW Binlog 需连接至 ByteHouse CDW,当前支持通过 IAM 用户连接或数据库用户连接,您可提前获取网络域名、端口、API Key、数据库用户及密码等连接信息,详情请参考获取 ByteHouse 连接信息。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
- IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
- 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
更多 IAM 用户和数据库用户的介绍请参见以下文档: - ByteHouse权限管控概述
- 管理 IAM 用户
- 管理数据库用户
通过 Flink SQL 消费
源端会根据操作类型自动为每行数据设置准确的 Flink RowKind 类型,包括 INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER 等,以实现镜像同步表的数据,类似 MySQL 和 Postgres 的 CDC(变更数据捕获,ChangeDataCapture) 功能。
ByteHouse CDW 表开启 Binlog 后,在 Flink 中可使用如下 DDL 配置源表,实时消费 Binlog。使用时,您可以使用 ByteHouse IAM 用户或数据库用户连接。
CREATE TABLE bh_binlog_source (
`id` INT,
`a` BIGINT,
`b` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'bytehouse-cdw',
'binlog' = 'true',
'jdbc.enable-gateway-connection' = 'true',
'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
'bytehouse.gateway.host' = '<your_bytehouse_cdw_host>',
'bytehouse.gateway.api-token' = '<your_bytehouse_cdw_api_key>',
'bytehouse.gateway.virtual-warehouse' = '<your_bytehouse_cdw_vw_id>',
'database' = '<your_database_name>',
'table-name' = '<your_table_name>',
'scan.startup.mode' = 'earliest',
'scan.max-split-size' = '5000',
'scan.cdc.shard.split' = 'true'
);
CREATE TEMPORARY TABLE print_sink WITH (
'connector' = 'print'
) LIKE bh_binlog_source (EXCLUDING OPTIONS);
INSERT INTO print_sink SELECT * FROM bh_binlog_source;
CREATE TABLE bh_binlog_source (
`id` INT,
`a` BIGINT,
`b` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'bytehouse-cdw',
'binlog' = 'true',
'jdbc.enable-gateway-connection' = 'true',
'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
'bytehouse.gateway.host' = '<your_bytehouse_cdw_host>',
'bytehouse.gateway.account' = '<your_volcano_engine_ID>',
'bytehouse.gateway.username' = '<your_bytehouse_cdw_database_user>',
'bytehouse.gateway.paasword' = '<your_bytehouse_cdw_database_user_password>',
'bytehouse.gateway.virtual-warehouse' = '<your_bytehouse_cdw_vw_id>',
'database' = '<your_database_name>',
'table-name' = '<your_table_name>',
'scan.startup.mode' = 'earliest',
'scan.max-split-size' = '5000',
'scan.cdc.shard.split' = 'true'
);
CREATE TEMPORARY TABLE print_sink WITH (
'connector' = 'print'
) LIKE bh_binlog_source (EXCLUDING OPTIONS);
INSERT INTO print_sink SELECT * FROM bh_binlog_source;
参数说明
参数 | 是否必填 | 默认值 | 说明 |
|---|
CREATE TABLE ...
| 是 | 无 | 创建 binlog 写入源表,您可自定义表名称,并定义表列名及类型。 |
connector
| 是 | 无 | 指定要使用的连接器,需配置为bytehouse-cdw。 |
jdbc.enable-gateway-connection
| 是 | false | 指定 JDBC 连接是否需要经过 ByteHouse 网关。 |
bytehouse.gateway.region
| 是 | 无 | ByteHouse 网关区域,设置为 VOLCANO_PRIVATE。 |
bytehouse.gateway.host
| 是 | 无 | ByteHouse CDW 的网络域名,设置前请将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE。
格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com,您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。 |
bytehouse.gateway.virtual-warehouse
| 否 | 控制台配置的默认计算组 | 通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。
您也可以指定其他计算组。您可登录 ByteHouse CDW 控制台,在计算组页签下,查看并复制计算组 ID。 |
bytehouse.gateway.api-token
| 使用 IAM 用户连接时必填 | 无 | ByteHouse 网关 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。 |
bytehouse.gateway.account
| 使用数据库用户连接时必填 | 无 | 指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。 |
bytehouse.gateway.username
| 使用数据库用户连接时必填 | 无 | ByteHouse 数据库账号用户名。可通过 ByteHouse 控制台 > 权限管理 > 用户路径,查看并复制数据库用户名。 |
bytehouse.gateway.paasword
| 使用数据库用户连接时必填 | 无 | 指 ByteHouse 数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码。 |
database
| 是 | 无 | 需连接的 ByteHouse 云数仓版数据库的名称。 |
table-name
| 是 | 无 | 需连接 ByteHouse 云数仓版表的名称。 |
binlog
| 否 | false | 是否开启读取 Binlog 信息。设置为 true 时,表示开启实时消费 Binlog 功能,当前仅支持设置为 true。 |
scan.startup.mode
| 否 | LATEST | 设置起始消费位置。目前仅支持增量模式,可选值:
• earliest:从最早可用的 BSN 开始。
• latest:从最新 BSN 开始。
• specific:从 scan.startup.specific-offset.pos 指定的 BSN 开始。 |
scan.max-split-size
| 否 | 10000 | 设置批量读取 Binlog 的数据行数。 |
scan.cdc.shard.split
| 否 | true | 设置是否分 shard 读取,true 为是,false 为否,默认为 true。 |
scan.cdc.max-retries
| 否 | 3 | 设置消费 Binlog 数据出错后的重试次数。 |
scan.startup.specific-offset.pos
| 否 | 无 | 当scan.startup.mode设置为 SPECIFIC 时,可以指定 BSN(Binlog Sequence Number)消费。 |
CREATE TEMPORARY TABLE ...
| 是 | 无 | 创建临时表,该表结构与写入源 bh_binlog_source 一致,但其 connector 为 print,表示数据会直接打印到 Stdout(标准输出)日志。 connector:指定要使用的连接器,需配置为 print。Print 连接器是一个系统内置的调试专用结果表,可以将作业结果输出并打印到 Stdout 日志中。注意 请不要在正式环境的任务中使用 Print 调试连接器,会影响任务性能,存在写满磁盘的风险。 LIKE:指定为写入源表的名称,将集成源表的结构。EXCLUDING OPTIONS:表示不继承源表的连接参数。
|
INSERT INTO ... SELECT * FROM ...
| 是 | 无 | 创建数据同步任务,将写入源 bh_binlog_source 读取的数据全量同步至 print_sink 表,最终通过 print 连接器输出到 Stdout 日志中。 |
通过 Flink DataStream API 消费
下面是通过 Flink DataStream API 消费 ByteHouse CDW Binlog 的构建示例。使用时,您可以使用 ByteHouse IAM 用户或数据库用户连接。
import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.CnchCdcSource;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.deserialization.RowDataDeserializationSchema;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.table.StartupOptions;
import com.bytedance.bytehouse.flink.connector.cnch.ByteHouseCdwClient;
import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchCdcSourceBuilder;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;
/**
* 以下展示了 ByteHouse CDW CDC 连接器通过读取 binlog 将数据写入 ByteHouse CDW 表的使用方法。
*/
public class CnchCdcSourceExample {
public static void main(String[] args) throws Exception {
final String region = "VOLCANO_PRIVATE";
final int port = 19000;
final String host = args[0];
final String apiToken = args[1];
final String virtualWarehouse = args[2];
final String database = args[3];
final String tableName = args[4];
final String tableNameSink = args[5];
final String hostSink = args[6];
final String apiTokenSink = args[7];
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Column> columns =
Arrays.asList(
Column.physical("id", DataTypes.BIGINT()), Column.physical("c", DataTypes.STRING()));
// 构建 ByteHouse CDW CDC 源
CnchCdcSource<RowData> cdcSource =
// 设置 CnchCdcSourceBuilder
new CnchCdcSourceBuilder.CdcSource<RowData>(database, tableName)
.withGatewayConnection(region, host, port)
.withBinlog(true)
.withGatewayApiToken(apiToken)
.withGatewayVirtualWarehouse(virtualWarehouse)
.withSchema(columns)
.withStartupModeOption(StartupOptions.earliest()) // 设置起始位置为earliest
.withDeserializationSchema(new RowDataDeserializationSchema())
.withScanSplitSize(10000) // 设置单次获取的 binlog 最大批次大小,单位为行
.withShardSplit(true) // 开启分 shard 读
.build();
// 构建 ByteHouse CDW CDC datastream(数据流)
DataStream<RowData> dataStream =
env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource");
dataStream.print();
// 触发执行
env.execute(CnchSinkDataStreamExample.class.getName());
}
}
}
import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.CnchCdcSource;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.source.deserialization.RowDataDeserializationSchema;
import com.bytedance.bytehouse.flink.cdc.connector.cnch.table.StartupOptions;
import com.bytedance.bytehouse.flink.connector.cnch.ByteHouseCdwClient;
import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchCdcSourceBuilder;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;
/**
* 以下展示了 ByteHouse CDW CDC 连接器通过读取 binlog 将数据写入 ByteHouse CDW 表的使用方法。
*/
public class CnchCdcSourceExample {
public static void main(String[] args) throws Exception {
final String region = "VOLCANO_PRIVATE";
final int port = 19000;
final String host = args[0];
final String apiToken = args[1];
final String virtualWarehouse = args[2];
final String database = args[3];
final String tableName = args[4];
final String tableNameSink = args[5];
final String hostSink = args[6];
final String apiTokenSink = args[7];
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Column> columns =
Arrays.asList(
Column.physical("id", DataTypes.BIGINT()), Column.physical("c", DataTypes.STRING()));
// 构建 ByteHouse CDW CDC 源
CnchCdcSource<RowData> cdcSource =
// 设置 CnchCdcSourceBuilder
new CnchCdcSourceBuilder.cdcSource<RowData>(database, tableName)
.withGatewayConnection(region, host, port)
.withBinlog(true)
.withGatewayAccount(accountId, username, password)
.withGatewayVirtualWarehouse(virtualWarehouse)
.withSchema(columns)
.withStartupModeOption(StartupOptions.earliest())
.withDeserializationSchema(new RowDataDeserializationSchema())
.withScanSplitSize(10000)
.withShardSplit(true)
.build();
// 构建 ByteHouse CDW CDC datastream
DataStream<RowData> dataStream =
env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource");
dataStream.print();
// 触发执行
env.execute(CnchSinkDataStreamExample.class.getName());
}
}
}
接口说明
接口 | 是否必填 | 说明 |
|---|
构建 ByteHouse CDW CDC 源 |
CnchCdcSource<RowData> cdcSource
| 是 | 声明一个CnchCdcSource 数据源,泛型为RowData,命名为cdcSource,该名称支持自定义。 <RowData>:Java 泛型,用于指定 CnchCdcSource 生成的数据类型。设置支持配置为以下值:
- RowData:推荐使用该值,Flink Table API/SQL内部使用的二进制优化的数据结构,适用于生产环境。
- ByteHouseRecord:ByteHouse 内部定义的数据结构,可在需要访问 ByteHouse 特有元数据时使用。
- GenericRowData:RowData 接口的基础实现,但内存效率低,仅适用于开发测试环境。
cdcSource:对象名称,支持自定义。
|
new CnchCdcSourceBuilder.CcdSource<RowData>(database, tableName)
| 是 | 设置需连接的 ByteHouse 的数据库和表名。database / tableName 为必填;其余参数皆有默认值,可按需覆盖。 database:需要连接的 ByteHouse 云数仓版数据库的名称。tableName:需要连接的 ByteHouse 云数仓版表的名称。
|
.withGatewayConnection(region, host, port)
| 是 | ByteHouse CDW 网关连接信息。 region:ByteHouse CDW 网关区域,该值固定设置为 VOLCANO_PRIVATE。host:ByteHouse CDW 网关的网络域名,格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。port:ByteHouse CDW 网关的端口号,固定配置为 19000。
|
.withBinlog(true)
| 否 | 是否读取 Binlog 信息。设置为 true 时,表示开启实时消费 Binlog 功能,当前仅支持设置为 true。 |
.withGatewayApiToken(apiToken)
| 使用 IAM 用户连接时必填 | 设置为 ByteHouse CDW 的 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。 |
.withGatewayAccount(accountId, username, password)
| 使用数据库用户连接时必填 | 设置为 ByteHouse CDW 数据库用户的账号和密码: accountId:指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。username:指数据库用户名称。您可通过 ByteHouse 控制台>权限管理>用户路径,查看并复制数据库用户名。password:指数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码。
|
.withGatewayVirtualWarehouse(virtualWarehouse)
| 否 | 通过 ByteHouse 网关进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。您也可以指定其他计算组。您可登录 ByteHouse CDW 控制台,在计算组页签下,查看并复制计算组 ID。 |
.withSchema(columns) | 否 | 设置读取的列。 |
.withStartupModeOption(StartupOptions.earliest())
| 是 | 设置从某个 BSN 开始消费 Binlog,目前只支持增量消费。可以设置最早、最晚、或者指定位置,对应的参数值分别为 earliest、latest、specific。 |
.withDeserializationSchema(new RowDataDeserializationSchema())
| 是 | 定义如何将二进制 Binlog 数据解析为 Flink 内部对象,设置时需确保与参数 CnchCdcSourceBuilder.CdcSource<T> 中设置的 <T> 类型对应,支持设置为以下三种类型: - RowDataDeserializationSchema:推荐值,与 对应。
- SimpleDeserializationSchema :与 对应
- GenericRowDataDeserializationSchema:与 对应。
|
withScanSplitSize(10000)
| 是 | 设置每次批量读取 Binlog 的数据行数。 |
.withShardSplit(true)
| 是 | 设置是否分 shard 读取,true 为是,false 为否,默认为 true。 |
withScanRetries(retrys: 1)
| 否 | 设置消费 Binlog 数据出错后的重试次数。 |
.build()
| 是 | 返回一个 CnchCdcSource 实例,该示例由对应的 CnchCdcSourceBuilder 构建。 |
构建 ByteHouse CDW CDC 数据流 |
DataStream dataStream | 是 | 构建 ByteHouse CDW CDC 数据流。DataStream 后定义的泛型需与上述数据源定义的类型相同。 |
env.fromSource(cdcSource, WatermarkStrategy.noWatermarks(), "testSource")
| 是 | 将 CDC 源转换为 Flink 数据流。 cdcSource:上述构建的数据源名称。WatermarkStrategy:推荐设置为 noWatermarks()。testSource:Flink 中显示的数据源名称,您可根据实际情况填写。
|