当您需要捕获 ByteHouse 云数仓版(CDW)数据库事件,实现数据之间的复制、同步,或者将这些事件作为消息流转发给不同的消费者进行处理时,ByteHouse 提供 Binlog 功能。消费 ByteHouse CDW Binlog 可提升数据复用能力,感知 ByteHouse CDW 相关表的增、删、改操作。本文将为您介绍如何在 ByteHouse 中订阅 Binlog 及相关操作。
ByteHouse CDW Binlog 与 MySQL Binlog 相同,支持借助 Binlog 记录数据库中所有数据的变化事件日志。通过 ByteHouse CDW Binlog,能够极为便捷、灵活地实现数据之间的复制与同步。然而,ByteHouse CDW Binlog 通常仅用于数据同步,而 MySQL Binlog 还应用于主从实例同步和数据恢复等高可用场景。因此,二者的实现存在一定差异,主要体现在以下方面:
同时,在大数据场景下,支持 Flink 直接消费 ByteHouse CDW Binlog。相较于传统数仓分层,Flink + ByteHouse CDW Binlog 能够实现完整的事件驱动,完成操作数据存储层(ODS)向数据仓库维度层(DWD)、DWD 向数据服务层(DWS)等的全实时加工作业,在满足分层治理的前提下实现统一存储,提升数据复用能力,并缩短数据加工端到端延迟,为用户提供一站式的实时数据仓库解决方案。
ByteHouse CDW Binlog 建议使用场景如下:
该功能为 Beta 功能,使用前,请联系提交工单或联系 ByteHouse 团队获取白名单权限。
请确保您使用的 ByteHouse CDW 引擎版本要求:v2.3.1 及以上版本。您可登录 ByteHouse CDW 控制台,单击顶部租户管理页签,查看基础信息中的引擎版本。低于该版本的引擎启动 Binlog 功能会报错,如需升级请提交工单。
当前仅支持 CnchMergeTree。对于新建表,创建时 ENGINE 需设置为 CnchMergeTree;对于已有表,您可以通过以下命令查询使用的表引擎。
SHOW CREATE TABLE your_table_name;
ByteHouse CDW Binlog 为表级别,且不会记录 DDL 操作。
ByteHouse CDW Binlog 开启后不支持关闭,请您评估业务需求后再开启。开启后如需关闭,请提交工单联系 ByteHouse 团队。
支持(✅)和 不支持(❌)输出 Binlog 的功能列表。
语句 | 非唯一键表 | 唯一键表 |
|---|---|---|
INSERT | ✅ | |
UPSERT、部分列更新 | ❌ | ✅ |
UPDATE、DELETE | ❌ | ✅ |
DDL
| ❌ | |
TTL 表达式 | ❌ 说明 TTL表达式删除的数据不产生Binlog。 | |
当前 ByteHouse CDW Binlog 仅支持增量数据消费,暂不支持全增量一体消费的场景。
开启 Binlog 后,执行数据操作语言(DML)时,ByteHouse 会对原表进行写入/删除/更新等操作,同时对 Binlog 执行写入操作,将生成的数据转换为 Binlog,并分配 BSN(Binlog Sequence Number)。
注意
开启 Binlog 后,DML 既需对原表进行写入操作,又要对 Binlog 进行写入操作,这会对写入性能产生一定程度的影响。此外,Binlog 数据本身也会占用一定的存储空间。请根据实际需求开启 Binlog,并配置合理的生存时间(TTL)。
Binlog 记录由 Binlog 系统字段和用户 Table 字段构成,具体字段定义如下表。
列名 | 数据类型 | 说明 |
|---|---|---|
| UInt64 | Binlog 的系统字段,表示当前 Record 所属的 Binlog 的 BSN 和 BSN 内的行号。
|
| UInt16 | Binlog 的系统字段,表示当前 Record 所属的 Binlog shard 序号,用于支持多个消费者并行消费(每个消费线程消费各自
|
| Enum8 | Binlog 的系统字段,表示当前 Record 所表示的修改类型。有如下 4 种可能的取值:
说明 更新操作会生成两条 Binlog 记录,分别为更新前和更新后的记录( |
| 用户自定义 | 用户 Table 字段。 |
... | ... | ... |
| 用户自定义 | 用户 Table 字段。 |
ByteHouse Binlog 支持配置以下参数:
配置 | 是否必填 | 默认值 | 说明 |
|---|---|---|---|
enable_binlog | 是 | 0 | 是否开启 Binlog。该参数值为 1 时表示启用 Binlog,该参数值为 0 时表示未启用 Binlog,启用后不支持关闭。 |
binlog_lifetime | 否 | 86400 | Binlog 的 TTL,单位为秒。 |
binlog_shard_num | 否 | 64 | Binlog 的 shard 个数,用于提升 Binlog 消费并行度,支持 Flink 多消费者并行消费**。**需在通过 |
binlog_row_deduplicate | 否 | false | 开启后,对于更新前后('U-' 和 'U+')数值完全一致的记录,不再输出 Binlog。 注意 此配置更改后,仅对新生成的 Binlog 生效,不会更改历史 Binlog。 |
示例 1:新建表时开启 Binlog。
CREATE TABLE binlog_test( `id` Int32, `a` Int64, `b` String ) ENGINE = CnchMergeTree ORDER BY a UNIQUE KEY id SETTINGS enable_binlog = 1, -- 必须,打开 Binlog binlog_lifetime = 86400, -- 可选 binlog_shard_num = 4, -- 可选 binlog_row_deduplicate = false; -- 可选
示例 2:为现有的表开启 Binlog。
ALTER TABLE binlog_test MODIFY SETTING binlog_lifetime = 3600; -- 可选,调整 TTL ALTER TABLE binlog_test MODIFY SETTING binlog_shard_num = 4; -- 可选,调整 shard 数 ALTER TABLE binlog_test MODIFY SETTING enable_binlog = 1; -- 必须,打开 binlog
开启 Binlog 后,您可使用 ALTER TABLE 修改 Binlog 配置。
ALTER TABLE binlog_test MODIFY SETTING binlog_lifetime = 7200;
ALTER TABLE binlog_test MODIFY SETTING binlog_row_deduplicate = true;
ByteHouse 支持查看表的全部 Binlog 数据,也支持仅查询指定 BSN 范围的 Binlog。如果您不确定 BSN 范围,您可以先查询系统表 system.cnch_table_info,确定需要查询的 BSN 范围。您也可以通过查询系统表system.cnch_binlogs读取查看对应 binlog part 的详细信息,如可提前查询当前 Binlog part 内 Shard 记录的行数,根据此定制自己的读取行为,提升查询效率。
开启 Binlog 后,可以通过在表名后面添加 $binlog 来查询该表的 Binlog。例如,您可通过以下 SQL 语句查询 db_test.binlog_test 的 Binlog。
SELECT * FROM db_test.binlog_test$binlog;
ByteHouse 支持只查询指定 BSN 范围内的 Binlog,查询时需要添加形如 bsn(表名$binlog, A, B) 的 hint,表示查询该表 BSN 范围在 (A,B] 范围内的 Binlog。例如,您可通过以下命令来查询 db_test.binlog_test 内 BSN 为 2 或 3 的 Binlog。
SELECT /*+ bsn(db_test.binlog_test$binlog, 1, 3) */ * FROM db_test.binlog_test$binlog;
以下示例演示了 Binlog 几种常见场景的查询方式。
假设我们已经在表 db_test.binlog_test 上执行了一些 DML:
INSERT INTO binlog_test VALUES (1,2,'string1-1'),(2,1,'string2-1'),(3,3,'string3-1'); INSERT INTO binlog_test VALUES (1,1,'string1-1'),(2,1,'string2-2'),(4,2,'string4-1'); DELETE FROM binlog_test WHERE id < 3;
查询表 db_test.binlog_test 所有 Binlog。
SELECT _binlog_bsn, _part_row_number, * FROM `db_test.binlog_test$binlog` ORDER BY _binlog_bsn ASC
查询结果示例如下:
┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 1 │ 0 │ 1 │ I │ 1 │ 2 │ string1-1 │ │ 1 │ 1 │ 2 │ I │ 2 │ 1 │ string2-1 │ │ 1 │ 2 │ 3 │ I │ 3 │ 3 │ string3-1 │ └────────┴────────────┴───────────┴────────────┴────┴──┴───────┘ ┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 2 │ 0 │ 1 │ U- │ 1 │ 2 │ string1-1 │ │ 2 │ 1 │ 1 │ U+ │ 1 │ 1 │ string1-1 │ │ 2 │ 2 │ 1 │ I │ 4 │ 2 │ string4-1 │ │ 2 │ 3 │ 2 │ U- │ 2 │ 1 │ string2-1 │ │ 2 │ 4 │ 2 │ U+ │ 2 │ 1 │ string2-2 │ └────────┴─────────────┴──────────┴────────────┴────┴──┴───────┘ ┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 3 │ 0 │ 1 │ D │ 1 │ 1 │ string1-1 │ │ 3 │ 1 │ 2 │ D │ 2 │ 1 │ string2-2 │ └────────┴────────────┴───────────┴────────────┴────┴──┴───────┘
查询表 db_test.binlog_test 内 BSN 为 2 或 3 的 Binlog。
SELECT /*+ bsn(db_test.binlog_test$binlog, 1, 3) */ _binlog_bsn, _part_row_number, * FROM `db_test.binlog_test$binlog` ORDER BY _binlog_bsn ASC
查询结果示例如下:
┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 2 │ 0 │ 1 │ U- │ 1 │ 2 │ string1-1 │ │ 2 │ 1 │ 1 │ U+ │ 1 │ 1 │ string1-1 │ │ 2 │ 2 │ 1 │ I │ 4 │ 2 │ string4-1 │ │ 2 │ 3 │ 2 │ U- │ 2 │ 1 │ string2-1 │ │ 2 │ 4 │ 2 │ U+ │ 2 │ 1 │ string2-2 │ └────────┴─────────────┴──────────┴─────────────┴───┴──┴───────┘ ┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 3 │ 0 │ 1 │ D │ 1 │ 1 │ string1-1 │ │ 3 │ 1 │ 2 │ D │ 2 │ 1 │ string2-2 │ └────────┴────────────┴───────────┴────────────┴────┴──┴───────┘
查询表 db_test.binlog_test 内 BSN 为 2 或 3 且 binlog_shard_id 为 2 的 Binlog。
SELECT /*+ bsn(db_test.binlog_test$binlog, 1, 3) */ _binlog_bsn, _part_row_number, * FROM `db_test.binlog_test$binlog` WHERE binlog_shard_id = 2 ORDER BY _binlog_bsn ASC
查询结果示例如下:
┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 2 │ 3 │ 2 │ U- │ 2 │ 1 │ string2-1 │ │ 2 │ 4 │ 2 │ U+ │ 2 │ 1 │ string2-2 │ └────────┴────────────┴───────────┴────────────┴───┴───┴───────┘ ┌─_binlog_bsn─┬─_part_row_number─┬─binlog_shard_id─┬─binlog_event_type─┬─id─┬─a─┬─b─────┐ │ 3 │ 1 │ 2 │ D │ 2 │ 1 │ string2-2 │ └────────┴────────────┴───────────┴─────────────┴───┴──┴───────┘
查询表 db_test.binlog_test 所有新插入的数据(binlog_event_type 为 'I'),并且不输出 Binlog 系统字段。
SELECT * EXCEPT (binlog_event_type, binlog_shard_id) FROM `db_test.binlog_test$binlog` WHERE binlog_event_type = 'I'
查询结果示例如下:
┌─id─┬─a─┬─b─────┐ │ 1 │ 2 │ string1-1 │ │ 2 │ 1 │ string2-1 │ │ 3 │ 3 │ string3-1 │ └───┴──┴───────┘ ┌─id─┬─a─┬─b─────┐ │ 4 │ 2 │ string4-1 │ └───┴───┴───────┘
ByteHouse CDW 当前支持通过 Flink 消费 ByteHouse CDW Binlog,操作详情请参见Flink 实时消费 ByteHouse CDW Binlog。
ByteHouse 提供了 system.cnch_table_info 和 system.cnch_binlogs 两张系统表来展示某张表的 Binlog 的状态信息。
系统表 | 用途 | 速度 | 推荐使用场景 |
|---|---|---|---|
| 展示某张表当前的最大和最小 BSN。 | 快 | 仅需要查询表最大和最小 BSN。 |
| 展示某张表每个 Binlog part 的详细信息。 | 慢 | 读取 Binlog part 前可查询详细信息,以此定制读取方式。 |
展示某张表当前的最大和最小 BSN。
列名 | 数据类型 | 说明 |
|---|---|---|
database | String | 库名 |
table | String | 表名 |
min_bsn | Nullable(UInt64) | TTL 内的最小 BSN。对于未开启 Binlog 或还没有 Binlog 的表,该字段为 NULL。 |
max_bsn | Nullable(UInt64) | TTL 内的最大 BSN。对于未开启 Binlog 或还没有 Binlog 的表,该字段为 NULL。 |
示例:查询表 db_test.binlog_test 的最大和最小 BSN。
SELECT database, table, min_bsn, max_bsn FROM system.cnch_table_info WHERE (database = 'db_test') AND (table = 'binlog_test')
查询结果示例如下:
┌─database─┬─table───┬─min_bsn─┬─max_bsn─┐ │ db_test │ binlog_test│ 10 │ 20 │ └──────┴────────┴──────┴──────┘
展示某张表每个 Binlog part 的详细信息。查询该系统表时,必须使用 database 和 table 的等值过滤条件。
列名 | 数据类型 | 说明 |
|---|---|---|
database | String | 查询的库名。 |
table | String | 查询的表名。 |
part_name | String | 当前 Binlog part 的名称。 |
bsn | UInt64 | 当前 Binlog part 的 BSN。 |
rows | UInt64 | 当前 Binlog part 的行数。 |
bytes | UInt64 | 当前 Binlog part 的存储字节数。 |
txn_id | UInt64 | 生成当前 Binlog part 的事务 ID。 |
commit_time | DateTime | 生成当前 Binlog part 的事务的提交时间。 |
published_time | DateTime | 当前 Binlog part 被发布的时间。 |
active | UInt8 | 当前 Binlog part 是否有效(在TTL内并且已发布)。 |
is_staged | UInt8 | 当前 Binlog part 是否已发布。 |
shard_stats | String | 当前 Binlog part 内每个 Shard 的记录的行数。 |
示例:查询表 db_test.binlog_test 的所有 Binlog part 的信息。
SELECT database, table, bsn, rows, bytes, active, shard_stats FROM system.cnch_binlogs WHERE (database = 'db_test') AND (table = 'binlog_test')
查询结果示例如下:
┌─database─┬─table───┬─bsn─┬─rows─┬─bytes─┬─active─┬─shard_stats────┐ │ db_test │ binlog_test │ 1 │ 3 │ 1416 │ 1 │ [ 0, 1, 1, 1 ] │ │ db_test │ binlog_test │ 2 │ 5 │ 1437 │ 1 │ [ 0, 3, 2, 0 ] │ │ db_test │ binlog_test │ 3 │ 2 │ 1403 │ 1 │ [ 0, 1, 1, 0 ] │ └──────┴────────┴───┴────┴──────┴─────┴────────────┘