HaUniqueMergeTree(唯一键引擎)是 ByteHouse 自研的企业级表引擎,在完全兼容 ClickHouse 高效查询性能的基础上,原生支持主键更新能力。该引擎精准解决了社区版 ClickHouse 无法高效实现更新操作的核心痛点,大幅降低实时分析类应用的开发门槛。本文将介绍 HaUniqueMergeTree 的用法。
UNIQUE KEY 配置唯一键后,ByteHouse 提供 upsert 更新写语义,查询时自动返回每个唯一键对应的最新数据。相较于社区的 ReplacingMergeTree 需要等待后台 Merge 完成后才能获取去重数据,而 HaUniqueMergeTree 可实现导入后立即去重。数据量规划需匹配唯一模式选型:
使用时,请务必保证同一个 unique key 写到同一个 shard 中,不然将无法保证唯一性。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... ) ENGINE = HaUniqueMergeTree('/clickhouse/tables/{shard}/{database}/table_name', '{replica}'[, version_column]) -- 默认为 '/clickhouse/bytehouse/库名.表名/{shard}','{replica}' PARTITION BY toYYYYMM(EventDate) ORDER BY expr [PARTITION BY expr] UNIQUE KEY expr [SAMPLE BY expr] [TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ] [WHERE conditions] [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ] [SETTINGS name=value, ...]
UNIQUE KEY (product_id, sipHash64(city))。注意
其他的字段设置,如 Order By,Partition By等,和 MergeTree 家族的其他引擎的设置规则一致。
参数名 | 常用字段 | 默认值 | 说明 |
|---|---|---|---|
partition_level_unique_keys | 是 | 1 | 0:UNIQUE KEY 表粒度唯一 |
enable_unique_partial_update | 是 | 0 | 允许部分列更新写入 |
enable_disk_based_unique_key_index | 是 | 1 | 0:in-memory mode。在此方式下,系统会在每张unique表维护一个in-memory key index,因此能支撑的数据量受限于内存 ; |
假设表结构如下:
-- 引擎默认保证 unique key 在分区内的唯一性 -- 注:UNIQUE KEY 不支持 Nullable CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id;
插入数据。此时,写入相同 key 的数据可以实现更新(upsert 语义)。
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100); -- 写入相同 key 的数据可以实现更新(upsert语义) INSERT INTO t1 VALUES ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10001, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10002, 'Beijing', '男装', 2, 200);
查询自动返回每个 key 最新的数据:
select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10001 │ Beijing │ 男装 │ 5 │ 500 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 4 │ 400 │ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10001 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
UNIQUE KEY 也可以包含多个字段和表达式,如下面以两个字段:product_id, sipHash64(city) 为例:
-- UNIQUE KEY 可以包含多个字段和表达式 CREATE TABLE t1m ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1m/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY (product_id, sipHash64(city)); INSERT INTO t1m VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100), ('2020-10-29 23:50:00', 10002, 'Shanghai', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100); select * from t1m; ┌──────────event_time─┬─product_id─┬─city─────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10001 │ Beijing │ 男装 │ 5 │ 500 │ │ 2020-10-29 23:40:00 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-29 23:50:00 │ 10002 │ Shanghai │ 男装 │ 4 │ 400 │ └─────────────────────┴────────────┴──────────┴──────────┴────────┴─────────┘
假设表结构如下:
CREATE TABLE t2 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('xxxxxxx') PARTITION BY toDate(event_time) --分区字段 ORDER BY (city, category) --排序字段 UNIQUE KEY product_id --唯一键 SETTINGS partition_level_unique_keys = 0; --设置表级别唯一
顺序插入以下测试数据:
INSERT INTO t2 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100); INSERT INTO t2 VALUES ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 4, 400), ('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10001, 'Beijing', '男装', 1, 100), ('2020-10-30 00:00:05', 10002, 'Beijing', '男装', 2, 200);
可以看到,10001,10002,10003 这三个产品都更新到了最新数据,且10001,10002 都从 2020-10-29 分区更新到了 2020-10-30 分区。
select * from t2 order by toDate(event_time), product_id; ┌──────event_time─┬product_id─┬─city──┬category─┬amount─┬revenue─┐ │ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10001 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-30 00:00:05 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ └─────────────┴───────┴─────┴──────┴─────┴─────┘
注意
目前唯一性约束是在单个节点内部保证的。因此对于分布式表,叫做 shard 级别唯一更准确。
表级别唯一需要由写入端保证相同 key 的数据不会写入多个 shard。
注意
默认情况下,相同 unique key 后写入的数据会覆盖已有的数据。这可能会带来以下问题:
为了解决上面的问题,HaUniqueMergeTree 支持将表中的某个字段指定为版本字段。引擎保证写入相同 key 的数据时,只有数据版本大于等于已有版本时,才会进行覆盖。版本字段支持所有 UInt 类型和 Data/DateTime,且不能为 Nullable。
假设表结构如下:
CREATE TABLE t3 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t3/{shard}', '{replica}', event_time) --event_time为版本字段 PARTITION BY toDate(event_time) --分区字段 ORDER BY (city, category) --排序字段 UNIQUE KEY product_id; --唯一键
顺序插入以下数据:
INSERT INTO t3 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 8, 800), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 5, 500);
若在此时重新导入回溯前两条数据,由于版本 < 已有版本,写入时自动跳过。
INSERT INTO t3 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200);
10001 和 10002 的版本没有回退。
select * from t3 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:50:00 │ 10001 │ Beijing │ 男装 │ 8 │ 800 │ │ 2020-10-29 23:50:00 │ 10002 │ Beijing │ 男装 │ 5 │ 500 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
继续回溯后两条数据,并写入两条新版本数据。
INSERT INTO t3 VALUES ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 8, 800), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 5, 500), ('2020-10-29 23:55:00', 10001, 'Beijing', '男装', 10, 1000), ('2020-10-29 23:55:00', 10002, 'Beijing', '男装', 7, 700);
查询自动返回最新版本的数据。
select * from t3 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:55:00 │ 10001 │ Beijing │ 男装 │ 10 │ 1000 │ │ 2020-10-29 23:55:00 │ 10002 │ Beijing │ 男装 │ 7 │ 700 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
在 Lambda 架构下,如果你的业务存在以下需求,可参考本方案:
传统的实现方式中,为了满足以上需求,可直接将分区字段(日期)作为版本字段来实现,虽然该方式可以满足需求,但需要额外存储一个日期字段。由于分区下所有数据的日期完全一致,额外存储会造成明显的资源浪费。
为了优化该场景,HaUniqueMergeTree 支持直接使用分区表达式作为版本。当引擎检测到版本字段为分区字段时,会自动从元数据中读取版本,避免额外的数据读写。
假设表结构如下:
-- 创建一张按天分区、表粒度唯一的 unique 表,使用分区字段作为版本 CREATE TABLE t4 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t4/{shard}', '{replica}', toDate(event_time)) PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id SETTINGS partition_level_unique_keys = 0;
10-29 实时任务数据写入。
INSERT INTO t4 VALUES ('2020-10-29 10:00:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 10:00:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 10:10:00', 10001, 'Beijing', '男装', 8, 800), ('2020-10-29 10:10:00', 10002, 'Beijing', '男装', 5, 500);
10-30 实时任务数据写入,将 10002 更新到 10-30 分区。
INSERT INTO t4 VALUES ('2020-10-30 08:00:00', 10002, 'Beijing', '男装', 10, 1000), ('2020-10-30 08:00:00', 10003, 'Beijing', '男装', 3, 300);
10-30 离线任务重写 10-29 数据。
INSERT INTO t4 VALUES ('2020-10-29 10:10:00', 10001, 'Beijing', '男装', 7, 700), ('2020-10-29 10:10:00', 10002, 'Beijing', '男装', 5, 500);
离线任务只会覆盖 10001 数据,10002 保留 10-30 中的最新数据。
select * from t4 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 10:10:00 │ 10001 │ Beijing │ 男装 │ 7 │ 700 │ │ 2020-10-30 08:00:00 │ 10002 │ Beijing │ 男装 │ 10 │ 1000 │ │ 2020-10-30 08:00:00 │ 10003 │ Beijing │ 男装 │ 3 │ 300 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
部分业务场景中,用户希望通过在 INSERT 操作时新增标识字段区分数据写入与删除状态,以此扩展 INSERT 语义。
针对这一需求,HaUniqueMergeTree 引擎为每张表默认内置保留字段 _delete_flag_,支持在 INSERT 或 INSERT SELECT 过程中指定该字段,可对分布式表使用。该字段类型为 UInt8,取值规则如下:取值 0 表示数据写入;取值非 0 表示数据删除。
注意 _delete_flag_ 字段仅可在 INSERT、INSERT SELECT 操作过程中指定,或在创建物化视图时配置;不可在 CREATE TABLE 建表语句中定义,且该字段不支持查询。
注意
删除行功能可以对底表为 HaUniqueMergeTree 的分布式表进行操作,前提是分布式表需要配置参数remote_table_is_ha_unique = 1。
用法示例如下:
使用 INSERT 方式
假设表结构如下:
-- 引擎默认保证 unique key 在分区内的唯一性 -- 注:UNIQUE KEY 不支持 Nullable CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id;
导入以下数据:
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100);
指定删除字段进行数据删除,删除字段设置非 0 时表示删除,设置为 0 时表示正常的 upsert 操作。
INSERT INTO t1 (event_time, product_id, city, category, amount, revenue, _delete_flag_) VALUES ('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 4, 400, 5), ('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 2, 200, 1), ('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100, 0);
查询结果中包含了新加入的一行数据,并删除了两行旧数据
select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ │ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
使用 INSERT WITH VERSION
假设表结构如下:
-- 引擎默认保证 unique key 在分区内的唯一性 -- 注:UNIQUE KEY 不支持 Nullable -- 指定版本号 CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64, `version` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}', version) PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id;
插入数据:
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500, 10), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200, 10), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100, 10);
指定删除字段并指定版本号,版本号小于查询结果中相应行的版本号,删除操作不会起作用。
INSERT INTO t1 (event_time, product_id, city, category, amount, revenue, version, _delete_flag_) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 4, 400, 5, 1), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200, 5, 1), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100, 5, 1);
查询结果不变,没有任何数据被删除。
select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┬─version─┐ │ 2020-10-29 23:40:00 │ 10001 │ Beijing │ 男装 │ 5 │ 500 │ 10 │ │ 2020-10-29 23:40:00 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ 10 │ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ 10 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┴─────────┘
指定删除字段进行数据删除,不指定版本号或者版本号设置为0,删除操作会跳过版本检查,直接执行。
INSERT INTO t1 (event_time, product_id, city, category, amount, revenue, _delete_flag_) VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 4, 400, 1); INSERT INTO t1 (event_time, product_id, city, category, amount, revenue, version, _delete_flag_) VALUES ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100, 0, 1);
查询结果删除了两行旧数据。
select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┬─version─┐ │ 2020-10-29 23:40:00 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │ 10 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┴─────────┘
使用 INSERT SELECT
假设表结构如下:
-- 引擎默认保证 unique key 在分区内的唯一性 -- 注:UNIQUE KEY 不支持 Nullable CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id;
插入数据:
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100); -- 通过INSERT SELECT 来删除revenue >= 200的数据 INSERT INTO t1 (event_time, product_id, city, category, amount, revenue, _delete_flag_) SELECT *, 1 as _delete_flag_ from t1 where revenue >= 200; -- 查询结果中已删除revenue >= 200的数据 select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
通过 INSERT SELECT 删除 revenue >= 200 的数据。
INSERT INTO t1 (event_time, product_id, city, category, amount, revenue, _delete_flag_) SELECT *, 1 as _delete_flag_ from t1 where revenue >= 200;
查询结果中已删除revenue >= 200的数据。
select * from t1 order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
MATERIALIZED VIEW
假设表结构如下:
-- 引擎默认保证 unique key 在分区内的唯一性 -- 注:UNIQUE KEY 不支持 Nullable CREATE TABLE t1 ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64, `delete_flag` UInt8 ) ENGINE = HaMergeTree('/clickhouse/default/t1/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category); CREATE TABLE des ( `event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64 ) ENGINE = HaUniqueMergeTree('/clickhouse/default/des/{shard}', '{replica}') PARTITION BY toDate(event_time) ORDER BY (city, category) UNIQUE KEY product_id;
创建物化视图并指定删除字段。
CREATE MATERIALIZED VIEW des_view TO des (`event_time` DateTime, `product_id` UInt64, `city` String, `category` String, `amount` UInt32, `revenue` UInt64, `_delete_flag_` UInt8) AS SELECT event_time, product_id, city, category, amount, revenue, delete_flag as _delete_flag_ FROM t1;
插入数据。
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500, 0), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200, 0), ('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100, 0);
删除数据。
INSERT INTO t1 VALUES ('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500, 1), ('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200, 1);
查询结果中删除了两行旧数据。
select * from des order by toDate(event_time), product_id; ┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐ │ 2020-10-29 23:40:00 │ 10003 │ Beijing │ 男装 │ 1 │ 100 │ └─────────────────────┴────────────┴─────────┴──────────┴────────┴─────────┘
enable_unique_partial_update = 1。remote_table_is_ha_unique = 1。enable_unique_partial_update = 1 后允许写入以部分列更新模式进行,默认关闭。enable_unique_partial_update 参数(默认值为 1),1 表示 Kafka 消费使用部分列更新模式,0 表示使用行更新模式。使用功能列 _update_columns_(String 类型)区分更新列方案的列更新规则:
_update_columns_ 中的内容是需要更新的列,以逗号分隔各列名,引擎在解析时不会处理列名前后的特殊字符,如空格、Tab、换行符等,且不支持正则表达式。_update_columns_ 为空时表示更新所有列。_update_columns_ 时直接更新,允许更新为默认值。partial_update_enable_merge_map = true 时对有旧值的 key 进行更新,新 key 直接写入,未更新的旧 key 保留原值;为 false 时直接替换 value。CREATE TABLE t1 ( k Int32, c1 Int32, c2 Nullable(Float64), c3 Nullable(String), c4 Nullable(Int64), m1 Map(String, Int32), a1 Array(String)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 1, partial_update_enable_merge_map = 0; SET enable_unique_partial_update = 1; -- 也可以不显示的设置, 但需要对在表参数中配置enable_unique_partial_update=1,见以上建表示例 INSERT INTO t1 (k, c1, c2, m1, a1) VALUES (1, 10, 3.14, {'k1':1}, ['hello']); -- 自动解析会填充_update_columns_为'k,c1,c2,m1,a1' ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k1':1} │ ['hello'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ -- 主动指定 _update_columns_ -- 标粗字体代表已更新 INSERT INTO t1 (k, c1, m1, a1, _update_columns_) VALUES (1, 20, {'k2':2}, ['world'], 'k,c1,m1,a1'); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k2':2} │ ['world'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ -- 主动指定 _update_columns_ -- 进阶用法:不指定在_update_columns_中的列,就算有数据,也不会真正更新 INSERT INTO t1 (k, c1, m1, a1, _update_columns_) VALUES (1, 200, {'k2':20}, ['world20'], 'k'); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k2':2} │ ['world'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c3, m1) VALUES (1, 0, 'foo', {'k3':3}); -- 自动解析填充_update_columns_为'k,c1,c3,m1' -- 等价于INSERT INTO t1 (k, c1, c3, m1, _update_columns_) VALUES (1, 0, 'foo', {'k3':3}, 'k, c1, c3, m1'); -- 此时c1会强制更新为0,c3强制更新为'foo',m1强制更新为'k3':3 ┌─k─┬─c1─┬───c2─┬─c3──┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 0 │ 3.14 │ foo │ ᴺᵁᴸᴸ │ {'k3':3} │ ['world'] │ └───┴────┴──────┴─────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1, _update_columns_) VALUES (1, 10, 31.4, 'goo', 15, {'k4': 4}, ['hello', 'world'], ''); -- 当主动指定_update_columns_为''时表示更新所有列 ┌─k─┬─c1─┬───c2─┬─c3──┬─c4─┬─m1───────┬─a1────────────────┐ │ 1 │ 10 │ 31.4 │ goo │ 15 │ {'k4':4} │ ['hello','world'] │ └───┴────┴──────┴─────┴────┴──────────┴───────────────────┘ -- 更灵活的用法:给每行指定 _update_columns_ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1, _update_columns_) FORMAT JSONEachRow {"k":"1", "c1": "100", "_update_columns_":"k,c1"} {"k":"1", "c2": "314.0", "_update_columns_":"k,c2"}; ┌─k─┬──c1─┬──c2─┬─c3──┬─c4─┬─m1───────┬─a1────────────────┐ │ 1 │ 100 │ 314 │ goo │ 15 │ {'k4':4} │ ['hello','world'] │ └───┴─────┴─────┴─────┴────┴──────────┴───────────────────┘
该方式需要设置 partial_update_enable_specify_update_columns = 0。
设置 partial_update_enable_specify_update_columns = 0,慎用默认值。
非 Map 类型:写入非默认值表示更新,不允许更新为默认值。
Map 类型:partial_update_enable_merge_map = true 时对有旧值的 key 进行更新,新 key 直接写入,未更新的旧 key 保留原值;为 false 时直接替换 value。
CREATE TABLE t1 ( k Int32, c1 Int32, c2 Nullable(Float64), c3 Nullable(String), c4 Nullable(Int64), m1 Map(String, Int32), a1 Array(String)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 0, partial_update_enable_merge_map = 1; SET enable_unique_partial_update = 1; INSERT INTO t1 (k, c1, c2, m1, a1) VALUES (1, 10, 3.14, {'k1':1}, ['hello']); ┌─k─┬─c1─┬───c2─┬─c3───┬───c4─┬─m1───────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ {'k1':1} │ ['hello'] │ └───┴────┴──────┴──────┴──────┴──────────┴───────────┘ INSERT INTO t1 (k, c1, c3, m1) VALUES (1, 0, 'foo', {'k2':2}); -- 对于未指定字段 -- - c1为非组合类型,但写入的是默认值,因此保留旧值 -- - c2/c4为Nullable类型,保留旧值 -- - a1为Array空值,保留旧值 -- 对于m1,更新m1{'k2'},保留m1{'k1'} ┌─k─┬─c1─┬───c2─┬─c3──┬───c4─┬─m1──────────────┬─a1────────┐ │ 1 │ 10 │ 3.14 │ foo │ ᴺᵁᴸᴸ │ {'k1':1,'k2':2} │ ['hello'] │ └───┴────┴──────┴─────┴──────┴─────────────────┴───────────┘ INSERT INTO t1 (k, c1, c2, c3, c4, m1, a1) VALUES (1, 20, null, 'bar', 30, {'k2':0, 'k3':3}, ['world']); -- c1不是默认值,更新 -- c2为null,保留旧值;c3/c4不为null,更新 -- 更新m1{'k2'}, m1{'k3'},保留m1{'k1'} -- a1不为空值,更新 ┌─k─┬─c1─┬───c2─┬─c3──┬─c4─┬─m1─────────────────────┬─a1────────┐ │ 1 │ 20 │ 3.14 │ bar │ 30 │ {'k1':1,'k2':0,'k3':3} │ ['world'] │ └───┴────┴──────┴─────┴────┴────────────────────────┴───────────┘
【注意】使用这种方式在建表时 慎用默认值!
在建表时可以在字段后指定默认值,默认值可以为表达式。如果您指定了默认值,那么写入时对于缺省的列会按照建表时指定的方式进行填充,而部分列更新判断是否为默认值时是按照引擎内数据类型的默认值进行判断,因此可能会产生不符合预期的行为。下面将举个例子进行说明:
CREATE TABLE t1 ( k Int32, c1 Int32 DEFAULT k + 1, c2 Nullable(Float64)) ENGINE = HaUniqueMergeTree('/clickhouse/default/t1/{shard}', '{replica}') UNIQUE KEY k ORDER BY k SETTINGS enable_unique_partial_update = 1, partial_update_enable_specify_update_columns = 0; insert into t1 values (1, 10, 3.14); ┌─k─┬─c1─┬───c2─┐ │ 1 │ 10 │ 3.14 │ └───┴────┴──────┘ insert into t1 (k, c2) values (1, 31.4); ┌─k─┬─c1─┬───c2─┐ │ 1 │ 2 │ 31.4 │ └───┴────┴──────┘ -- 如果在建表时没有指定从c1的默认值,那么这条写入的语义是将k=1的那条数据的c2被更新为31.4,但是由于建表指定了默认值,等价于insert into t1 values (1, 2,31.4),因此c1也被更新为了2。这种情况下为了达到仅更新c2的目的,可以有以下两种方式:1. 建表时不要使用默认值;2. 显示指定默认值,即insert into t1 values (1, 0,31.4) -- ┌─k─┬─c1─┬───c2─┐ -- │ 1 │ 10 │ 31.4 │ -- └───┴────┴──────┘
在使用 in-memory 模式下,HaUniqueMergeTree 的导入性能约是 HaMergeTree 或 社区的 MergeTree 引擎的一半,为 10k rows/s/shard,或 20 MB/s。性能相比社区的 ReplacingMergeTree 也接近一致。
但查询性能上,HaUniqueMergeTree 的性能和 HaMergeTree 或 MergeTree 引擎一致。
在使用 disk-based 模式下,HaUniqueMergeTree 的导入性能约是 HaMergeTree 或 社区的 MergeTree 引擎的 30-40%,为 6-8k rows/s/shard,或 12-16 MB/s。查询性能依旧不变。
目前 unique 表有两种 key index 使用方式:in-memory 和 disk-based。由表级参数enable_disk_based_unique_key_index控制。
如果包含很多字段,考虑使用哈希值。例:UNIQUE KEY sipHash64(val1,val2,val3....)
优先使用默认的分区级别唯一。
如果业务场景必须使用表级别唯一,考虑设置 TTL、采用更粗粒度的分区(例如按月分区)等方式减少分区数量。
如果需要指定版本字段,优先考虑分区值作为版本,减少内存占用。
Consumer 会直接写本地 shard。因此