dbt(data building tool)工具将业务数据生成数据模型,以视图或表的形式存储到数据库,便于数据查询和分析。ByteHouse dbt 连接器(dbt-bytehouse)实现了 dbt 和 ByteHouse 的兼容,帮助您构建自己的数据模型。本文介绍了 dbt 及 dbt-bytehouse 的功能概况和使用方法。
dbt 是一款专注数据转换(ELT 中的 T 环节)的开源工具,将数据分析师和工程师编写的 select 语句抽象为数据模型,通过 SQL 逻辑描述数据的结构、关系和转换规则,并通过物化的方式将数据模型持久化到数据库,以视图、表等形式存储模型。
当前 dbt-bytehouse 提供了 3 种类型的物化,也支持了快照和 seed 功能,说明如下:
功能 | 简介 |
---|---|
视图(view)物化 | 逻辑定义,不存储数据,每次查询时动态计算。节省存储,但查询性能较低,适合轻量、低频使用的模型。 |
表(table)物化 | 将数据持久化为物理表,查询快,但占用存储,适合高频查询或需要预计算的结果(如聚合指标)。 |
增量(incremental)物化 | 仅处理新增或变更数据,大幅减少计算量,适合大数据场景。策略分为:
增量物化暂不支持 |
Seed | 用于加载 CSV 等数据文件时的功能模块,它是一种加载文件入库参与模型构建的一种方式,常用于初始化业务配置、维度表或参考数据。 |
快照 | dbt 中用于跟踪数据随时间变化的机制,记录数据的“历史版本”,捕捉源表中某条记录的字段变更(如状态更新、数值变动),并在快照表中保留所有历史状态及有效时间范围。 |
pip
,安装指南请参见 pip install。安装 dbt。
pip install dbt-core==1.8.9
说明
建议安装 dbt Core v1.8.x,详细安装指南请参见 dbt Core 简介及安装文档。
安装 ByteHouse dbt 连接器 dbt-bytehouse。
pip install dbt-bytehouse==1.8.9
检查是否安装成功。
dbt --version
安装成功的输出结果示例如下:
本节将说明如何创建新的 dbt 项目、虚拟环境,初始化 dbt 项目,并连接 dbt 和 ByteHouse。
创建新的 dbt 项目,命名为 bytehouse_demo_project
,并打开该项目。
mkdir bytehouse_demo_project cd bytehouse_demo_project
使用 Python 3.10 创建虚拟环境。
python3.10 -m venv venv source venv/bin/activate
初始化 dbt 项目。dbt init
命令将初始化已创建的 dbt 项目,将创建一个新文件夹,包含项目名称、示例文件 models/example 和配置文件 dbt_project.yml。
初始化过程中,dbt init
命令将提示输入项目名称和数据库,此时,请选择 clickhosue
。
dbt init bytehouse_demo_project
切换至项目所在的文件夹目录。
cd bytehouse_demo_project
移除 dbt 创建的示例文件。
rm -rf models/example
在 dbt
目录下创建 profiles.yml 文件,使用该文件配置 dbt 连接至 ByteHouse 的详细信息。
dbt_bytehouse_profile: target: <target-name> outputs: <target-name>: type: clickhouse schema: <database-name> driver: http host: <bytehouse-host> port: 8123 user: bytehouse password: <bytehouse-api-key> secure: True verify: False custom_settings: <setting-key>: <settings-value>
示例文件参数说明如下,更多参数说明请参见 dbt 官方文档。
参数 | 配置说明 |
---|---|
dbt_bytehouse_profile | dbt 项目配置中使用的配置文件名。您可按需自定义该名称。 |
| 定义 dbt 项目要输出的目标环境。 |
| 定义 dbt 项目的输出信息,支持在同一份 profiles.yml 文件中通过设置
|
| 要使用的 dbt 适配器类型,需配置为固定值: |
| 用于创建对象的默认架构,配置为 ByteHouse 已创建的数据库名称。 |
| 通信协议,需配置为固定取值: |
| 配置为 ByteHouse 服务器的公网域名,格式为: |
| ByteHouse 服务器上要连接的端口号,HTTP 协议需配置为固定取值:8123。 |
| 登录 ByteHouse 数据库的用户名,需配置为固定值 |
| 登录 ByteHouse 数据库的密码,密码为 |
| 布尔标志(true/false),指示是否使用加密连接(HTTPS)。需设置为 |
| 需设置为 |
| 配置需传递给连接器的其他 ByteHouse 连接设置。您可以设置计算组、最大执行时间等参数。示例如下:
计算组 ID 可通过 ByteHouse 控制台 > 租户管理 > 参数设置 > 默认计算组获取。 |
使用文本编辑器打开 bytehouse_demo_project
项目目录,更新该项目中的 dbt_project.yml 文件,确保该文件中的 profile
参数值与您在步骤 6 中为系统配置的 dbt/profiles.yaml 名称保持一致。
例如,系统中 dbt/profiles.yaml 文件配置名称为 dbt_bytehouse_profile
,则项目中 dbt_project.yaml 文件应将 profile
的值保持一致。
系统中的 dbt/profiles.yaml 文件配置如下,框选部分为名称:
项目中的 dbt_projet.yaml
文件配置示例如下,框选部分为需更新的 profile
参数值:
测试 dbt 是否成功连接至 ByteHouse。如果连接成功,输出结果会显示 “All checks passed!”。
dbt debug
连接成功的输出结果示例如下:
在 ByteHouse 中准备表和数据示例,用于测试 dbt 项目。您可以在 ByteHouse 控制台中执行以下 SQL 命令:
DROP DATABASE IF EXISTS bytehouse_demo; CREATE DATABASE bytehouse_demo; CREATE TABLE bytehouse_demo.customers ( id UInt64, name String, email String, updated_at DateTime ) ENGINE = CnchMergeTree PRIMARY KEY id ORDER BY (id, updated_at) PARTITION BY updated_at; INSERT INTO bytehouse_demo.customers (id, name, email, updated_at) VALUES (1, 'John Smith', 'john.smith@example.com', '2023-01-15 08:30:00'), (2, 'Jane Doe', 'jane.doe@example.com', '2023-01-16 10:45:00'), (3, 'Robert Johnson', 'robert.j@example.com', '2023-01-16 14:20:00'), (4, 'Sarah Williams', 'sarah.w@example.com', '2023-01-17 09:15:00'), (5, 'Michael Brown', 'michael.b@example.com', '2023-01-18 11:30:00');
在 dbt 项目 models 文件夹下创建 schema.yml 文件,将 customers
表定义为 dbt 源:
version: 2 sources: - name: customers_source schema: bytehouse_demo tables: - name: customers
在 dbt 项目 models 文件夹下创建 customers_summary_view.sql 文件,用于创建视图物化:
{{ config( materialized = 'view' ) }} SELECT id, name, email, updated_at FROM {{ source('customers_source', 'customers') }}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
dbt-bytehouse 内部将执行以下 SQL 命令,创建视图。
CREATE OR REPLACE VIEW `ecosystem`.`customers_summary_view` AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` );
在 dbt 项目 models 文件夹下创建 customers_summary_table.sql 文件,用于创建 table 物化:
{{ config( materialized = 'table' ) }} SELECT id, name, email, updated_at FROM {{ source('customers_source', 'customers') }}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
dbt-bytehouse 内部将执行以下 SQL 命令,创建表。
CREATE TABLE `bytehouse_demo`.`customers_summary_table` ENGINE = CnchMergeTree() ORDER BY (tuple()) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` );
在表已经存在的前提下,该模型将执行后续操作,dbt-bytehouse 将执行以下一整套 SQL 查询。
-- 1. 使用模型声明创建一个临时表,该临时表将用于替换现有的表 CREATE TABLE `bytehouse_demo`.`customers_summary_table__dbt_tmp` ENGINE = CnchMergeTree() ORDER BY (tuple()) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` ); -- 2. 如果备份表存在,则删除备份表 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table__dbt_backup`; -- 3. 将现有的表重命名为备份表 RENAME TABLE `bytehouse_demo`.`customers_summary_table` TO `bytehouse_demo`.`customers_summary_table__dbt_backup`; -- 4. 删除现有的表 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table`; -- 5. 将临时表重命名为所需的表名 RENAME TABLE `bytehouse_demo`.`customers_summary_table__dbt_tmp` TO `bytehouse_demo`.`customers_summary_table`; -- 6. 删除备份表 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table__dbt_backup`;
在 dbt 项目 models 文件夹下创建 customers_summary_incremental.sql 文件,用于创建增量物化。此处,您需要提供 dbt 的 unique_key
和 is_incremental
子句。
{{ config( materialized='incremental', unique_key='id' ) }} SELECT id, name, email, updated_at FROM {{ source('customers_source', 'customers') }} {% if is_incremental() %} WHERE updated_at > (SELECT max(updated_at) FROM {{ this }}) {% endif %}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
| 唯一键表的 unique_key 值。 |
dbt-bytehouse 将执行以下 SQL 命令,创建增量表:
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental` ENGINE = CnchMergeTree() ORDER BY (tuple()) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` );
在表已经存在的前提下,dbt-bytehouse 将继续执行以下查询:
-- 1. 创建中间表 __dbt_new_data,包含已更新/新数据 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`; CREATE TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_new_data` ENGINE = CnchMergeTree() ORDER BY (tuple()) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` WHERE updated_at > ( SELECT max(updated_at) FROM `bytehouse_demo`.`customers_summary_incremental` ) ); -- 2. 创建临时表 __dbt_tmp,包含尚未更新的/新的数据 CREATE TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` AS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`; INSERT INTO `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` ("id", "name", "email", "updated_at") SELECT "id", "name", "email", "updated_at" FROM `bytehouse_demo`.`customers_summary_incremental` WHERE (id) NOT IN ( SELECT id FROM `bytehouse_demo`.`customers_summary_incremental__dbt_new_data` ); -- 3. 将新的/已更新的记录从 __dbt_new_data 表插入至临时表,此时,临时表 __dbt_tmp 包含全部记录 INSERT INTO `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` ("id", "name", "email", "updated_at") SELECT "id", "name", "email", "updated_at" FROM `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`; -- 4. 删除中间表 __dbt_new_data DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`; -- 5. 如果 backup 表已存在,则删除。backup 表用于存储现有表数据 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_backup`; -- 6. 使用现有表数据创建 backup 表 RENAME TABLE `bytehouse_demo`.`customers_summary_incremental` TO `bytehouse_demo`.`customers_summary_incremental__dbt_backup`; -- 7. 删除现有表 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental`; -- 8. 将临时表重命名为最终表名称 RENAME TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` TO `bytehouse_demo`.`customers_summary_incremental`; -- 9. 最后,删除 backup 表,后续将不再用到该表,backup 表仅用于故障恢复和原子性目的 DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
在 dbt 项目 models 文件夹下创建 customers_summary_incremental_append. sql 文件,使用 append
策略创建增量物化:
{{ config( materialized='incremental', unique_key='id', incremental_strategy='append' ) }} SELECT id, name, email, updated_at FROM {{ source('customers_source', 'customers') }} {% if is_incremental() %} WHERE updated_at > (SELECT max(updated_at) FROM {{ this }}) {% endif %}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
| 唯一键表的 unique_key 值。 |
| 增量策略。 |
dbt-bytehouse 内部将执行以下 SQL 命令,创建增量表:
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_append` ENGINE = CnchMergeTree() ORDER BY (tuple()) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` );
如果表已存在,dbt-bytehouse 将继续执行以下 INSERT 查询:
INSERT INTO `bytehouse_demo`.`customers_summary_incremental_append` ("id", "name", "email", "updated_at") SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` WHERE updated_at > ( SELECT max(updated_at) FROM `bytehouse_demo`.`customers_summary_incremental_append` );
在 dbt 项目 models 文件夹下创建 customers_summary_incremental_override.sql 文件,使用 insert_overwrite
策略创建增量物化:
{{ config( materialized='incremental', partition_by='updated_at', incremental_strategy='insert_overwrite' ) }} SELECT id, name, email, updated_at FROM {{ source('customers_source', 'customers') }} {% if is_incremental() %} WHERE updated_at > (SELECT max(updated_at) FROM {{ this }}) {% endif %}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
| 该表的分区列。 |
| 增量策略。 |
dbt-bytehouse 内部将执行以下 SQL 来创建增量表:
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_override` ENGINE = CnchMergeTree() ORDER BY (tuple()) PARTITION BY (updated_at) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` )
如果表已存在,dbt-bytehouse 将继续执行以下 INSERT 查询:
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`; CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc` ENGINE = CnchMergeTree() ORDER BY (tuple()) PARTITION BY (updated_at) AS ( SELECT id, name, email, updated_at FROM `bytehouse_demo`.`customers` WHERE updated_at > (SELECT max(updated_at) FROM `bytehouse_demo`.`customers_summary_incremental_override`) ); SELECT DISTINCT partition_id FROM system.cnch_parts WHERE part_type <= 2 AND database = 'bytehouse_demo' AND table = 'customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc'; ALTER TABLE `bytehouse_demo`.`customers_summary_incremental_override` REPLACE PARTITION ID '1705276800' FROM `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`; DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_distributed_new_data`; DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
使用如下内容,在 dbt 项目 seeds 文件夹下创建 country_codes.csv 文件:
country_code,country_name US,United States CA,Canada GB,United Kingdom
执行 dbt seed
命令:
dbt seed
该命令生成以下 ByteHouse 查询:
INSERT INTO `bytehouse_demo`.`country_codes` ("country_code", "country_name") FORMAT CSV US,United States CA,Canada GB,United Kingdom
ByteHouse 唯一键表为指定了唯一键(Unique Key)的 CnchMergeTree 表,详情请参见唯一键表。
dbt-bytehouse 支持创建物化视图时使用 ByteHouse 唯一键表,如需要使用,您可以在模型配置文件 models/xxx.sql 中额外设置 cnch_unique_key
。
{{ config(order_by='(id)', cnch_unique_key='(id)', materialized='table') }}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的排序形式。 |
| ByteHouse Unique 表的 key 列。 |
| 该表的物化形式。 |