dbt(data building tool)工具将业务数据生成数据模型,以视图或表的形式存储到数据库,便于数据查询和分析。ByteHouse dbt 连接器(dbt-bytehouse)实现了 dbt 和 ByteHouse 的兼容,帮助您构建自己的数据模型。本文介绍了 dbt 及 dbt-bytehouse 的功能概况和使用方法。
dbt 是一款专注数据转换(ELT 中的 T 环节)的开源工具,将数据分析师和工程师编写的 select 语句抽象为数据模型,通过 SQL 逻辑描述数据的结构、关系和转换规则,并通过物化的方式将数据模型持久化到数据库,以视图、表等形式存储模型。
当前 dbt-bytehouse 提供了 3 种类型的物化,也支持了快照和 seed 功能,说明如下:
功能 | 简介 |
---|---|
视图(view)物化 | 逻辑定义,不存储数据,每次查询时动态计算。节省存储,但查询性能较低,适合轻量、低频使用的模型。 |
表(table)物化 | 将数据持久化为物理表,查询快,但占用存储,适合高频查询或需要预计算的结果(如聚合指标)。 |
增量(incremental)物化 | 仅处理新增或变更数据,大幅减少计算量,适合大数据场景。策略分为:
增量物化暂不支持 |
Seed | 用于加载 CSV 等数据文件时的功能模块,它是一种加载文件入库参与模型构建的一种方式,常用于初始化业务配置、维度表或参考数据。 |
快照 | dbt 中用于跟踪数据随时间变化的机制,记录数据的“历史版本”,捕捉源表中某条记录的字段变更(如状态更新、数值变动),并在快照表中保留所有历史状态及有效时间范围。 |
pip
,安装指南请参见 pip install。安装 dbt。
pip install dbt-core==1.8.9
说明
建议安装 dbt Core v1.8.x,详细安装指南请参见 dbt Core 简介及安装文档。
安装 ByteHouse dbt 连接器 dbt-bytehouse。
pip install dbt-bytehouse==1.8.9
检查是否安装成功。
dbt --version
安装成功的输出结果示例如下:
本节将说明如何创建新的 dbt 项目、虚拟环境,初始化 dbt 项目,并连接 dbt 和 ByteHouse。
创建新的 dbt 项目,命名为 bytehouse_demo_project
,并打开该项目。
mkdir bytehouse_demo_project
cd bytehouse_demo_project
使用 Python 3.10 创建虚拟环境。
python3.10 -m venv venv
source venv/bin/activate
初始化 dbt 项目。dbt init
命令将初始化已创建的 dbt 项目,将创建一个新文件夹,包含项目名称、示例文件 models/example 和配置文件 dbt_project.yml。
初始化过程中,dbt init
命令将提示输入项目名称和数据库,此时,请选择 clickhosue
。
dbt init bytehouse_demo_project
切换至项目所在的文件夹目录。
cd bytehouse_demo_project
移除 dbt 创建的示例文件。
rm -rf models/example
在 dbt
目录下创建 profiles.yml 文件,使用该文件配置 dbt 连接至 ByteHouse 的详细信息。
dbt_bytehouse_profile:
target: <target-name>
outputs:
<target-name>:
type: clickhouse
schema: <database-name>
driver: http
host: <bytehouse-host>
port: 8123
user: bytehouse
password: <bytehouse-api-key>
secure: True
verify: False
custom_settings:
<setting-key>: <settings-value>
示例文件参数说明如下,更多参数说明请参见 dbt 官方文档。
参数 | 配置说明 |
---|---|
dbt_bytehouse_profile | dbt 项目配置中使用的配置文件名。您可按需自定义该名称。 |
| 定义 dbt 项目要输出的目标环境。 |
| 定义 dbt 项目的输出信息,支持在同一份 profiles.yml 文件中通过设置
|
| 要使用的 dbt 适配器类型,需配置为固定值: |
| 用于创建对象的默认架构,配置为 ByteHouse 已创建的数据库名称。 |
| 通信协议,需配置为固定取值: |
| 配置为 ByteHouse 服务器的公网域名,格式为: |
| ByteHouse 服务器上要连接的端口号,HTTP 协议需配置为固定取值:8123。 |
| 登录 ByteHouse 数据库的用户名,需配置为固定值 |
| 登录 ByteHouse 数据库的密码,密码为 |
| 布尔标志(true/false),指示是否使用加密连接(HTTPS)。需设置为 |
| 需设置为 |
| 配置需传递给连接器的其他 ByteHouse 连接设置。您可以设置计算组、最大执行时间等参数。示例如下:
计算组 ID 可通过 ByteHouse 控制台 > 租户管理 > 参数设置 > 默认计算组获取。 |
使用文本编辑器打开 bytehouse_demo_project
项目目录,更新该项目中的 dbt_project.yml 文件,确保该文件中的 profile
参数值与您在步骤 6 中为系统配置的 dbt/profiles.yaml 名称保持一致。
例如,系统中 dbt/profiles.yaml 文件配置名称为 dbt_bytehouse_profile
,则项目中 dbt_project.yaml 文件应将 profile
的值保持一致。
系统中的 dbt/profiles.yaml 文件配置如下,框选部分为名称:
项目中的 dbt_projet.yaml
文件配置示例如下,框选部分为需更新的 profile
参数值:
测试 dbt 是否成功连接至 ByteHouse。如果连接成功,输出结果会显示 “All checks passed!”。
dbt debug
连接成功的输出结果示例如下:
在 ByteHouse 中准备表和数据示例,用于测试 dbt 项目。您可以在 ByteHouse 控制台中执行以下 SQL 命令:
DROP DATABASE IF EXISTS bytehouse_demo;
CREATE DATABASE bytehouse_demo;
CREATE TABLE bytehouse_demo.customers (
id UInt64,
name String,
email String,
updated_at DateTime
)
ENGINE = CnchMergeTree
PRIMARY KEY id
ORDER BY (id, updated_at)
PARTITION BY updated_at;
INSERT INTO bytehouse_demo.customers (id, name, email, updated_at) VALUES
(1, 'John Smith', 'john.smith@example.com', '2023-01-15 08:30:00'),
(2, 'Jane Doe', 'jane.doe@example.com', '2023-01-16 10:45:00'),
(3, 'Robert Johnson', 'robert.j@example.com', '2023-01-16 14:20:00'),
(4, 'Sarah Williams', 'sarah.w@example.com', '2023-01-17 09:15:00'),
(5, 'Michael Brown', 'michael.b@example.com', '2023-01-18 11:30:00');
在 dbt 项目 models 文件夹下创建 schema.yml 文件,将 customers
表定义为 dbt 源:
version: 2
sources:
- name: customers_source
schema: bytehouse_demo
tables:
- name: customers
在 dbt 项目 models 文件夹下创建 customers_summary_view.sql 文件,用于创建视图物化:
{{
config(
materialized = 'view'
)
}}
SELECT
id,
name,
email,
updated_at
FROM {{ source('customers_source', 'customers') }}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
dbt-bytehouse 内部将执行以下 SQL 命令,创建视图。
CREATE OR REPLACE VIEW `ecosystem`.`customers_summary_view` AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
);
在 dbt 项目 models 文件夹下创建 customers_summary_table.sql 文件,用于创建 table 物化:
{{
config(
materialized = 'table'
)
}}
SELECT
id,
name,
email,
updated_at
FROM {{ source('customers_source', 'customers') }}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
dbt-bytehouse 内部将执行以下 SQL 命令,创建表。
CREATE TABLE `bytehouse_demo`.`customers_summary_table`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
);
在表已经存在的前提下,该模型将执行后续操作,dbt-bytehouse 将执行以下一整套 SQL 查询。
-- 1. 使用模型声明创建一个临时表,该临时表将用于替换现有的表
CREATE TABLE `bytehouse_demo`.`customers_summary_table__dbt_tmp`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
);
-- 2. 如果备份表存在,则删除备份表
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table__dbt_backup`;
-- 3. 将现有的表重命名为备份表
RENAME TABLE `bytehouse_demo`.`customers_summary_table` TO `bytehouse_demo`.`customers_summary_table__dbt_backup`;
-- 4. 删除现有的表
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table`;
-- 5. 将临时表重命名为所需的表名
RENAME TABLE `bytehouse_demo`.`customers_summary_table__dbt_tmp` TO `bytehouse_demo`.`customers_summary_table`;
-- 6. 删除备份表
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table__dbt_backup`;
在 dbt 项目 models 文件夹下创建 customers_summary_incremental.sql 文件,用于创建增量物化。此处,您需要提供 dbt 的 unique_key
和 is_incremental
子句。
{{
config(
materialized='incremental',
unique_key='id'
)
}}
SELECT
id,
name,
email,
updated_at
FROM {{ source('customers_source', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT max(updated_at) FROM {{ this }})
{% endif %}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
| 唯一键表的 unique_key 值。 |
dbt-bytehouse 将执行以下 SQL 命令,创建增量表:
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
);
在表已经存在的前提下,dbt-bytehouse 将继续执行以下查询:
-- 1. 创建中间表 __dbt_new_data,包含已更新/新数据
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
WHERE updated_at > (
SELECT max(updated_at)
FROM `bytehouse_demo`.`customers_summary_incremental`
)
);
-- 2. 创建临时表 __dbt_tmp,包含尚未更新的/新的数据
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_tmp`
AS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
INSERT INTO `bytehouse_demo`.`customers_summary_incremental__dbt_tmp`
("id", "name", "email", "updated_at")
SELECT
"id",
"name",
"email",
"updated_at"
FROM `bytehouse_demo`.`customers_summary_incremental`
WHERE (id) NOT IN (
SELECT id
FROM `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`
);
-- 3. 将新的/已更新的记录从 __dbt_new_data 表插入至临时表,此时,临时表 __dbt_tmp 包含全部记录
INSERT INTO `bytehouse_demo`.`customers_summary_incremental__dbt_tmp`
("id", "name", "email", "updated_at")
SELECT
"id",
"name",
"email",
"updated_at"
FROM `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
-- 4. 删除中间表 __dbt_new_data
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
-- 5. 如果 backup 表已存在,则删除。backup 表用于存储现有表数据
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
-- 6. 使用现有表数据创建 backup 表
RENAME TABLE `bytehouse_demo`.`customers_summary_incremental`
TO `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
-- 7. 删除现有表
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental`;
-- 8. 将临时表重命名为最终表名称
RENAME TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_tmp`
TO `bytehouse_demo`.`customers_summary_incremental`;
-- 9. 最后,删除 backup 表,后续将不再用到该表,backup 表仅用于故障恢复和原子性目的
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
在 dbt 项目 models 文件夹下创建 customers_summary_incremental_append. sql 文件,使用 append
策略创建增量物化:
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='append'
)
}}
SELECT
id,
name,
email,
updated_at
FROM {{ source('customers_source', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT max(updated_at) FROM {{ this }})
{% endif %}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
| 唯一键表的 unique_key 值。 |
| 增量策略。 |
dbt-bytehouse 内部将执行以下 SQL 命令,创建增量表:
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_append`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
);
如果表已存在,dbt-bytehouse 将继续执行以下 INSERT 查询:
INSERT INTO `bytehouse_demo`.`customers_summary_incremental_append`
("id", "name", "email", "updated_at")
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
WHERE updated_at > (
SELECT max(updated_at)
FROM `bytehouse_demo`.`customers_summary_incremental_append`
);
在 dbt 项目 models 文件夹下创建 customers_summary_incremental_override.sql 文件,使用 insert_overwrite
策略创建增量物化:
{{
config(
materialized='incremental',
partition_by='updated_at',
incremental_strategy='insert_overwrite'
)
}}
SELECT
id,
name,
email,
updated_at
FROM {{ source('customers_source', 'customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT max(updated_at) FROM {{ this }})
{% endif %}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的物化形式。 |
| 该表的分区列。 |
| 增量策略。 |
dbt-bytehouse 内部将执行以下 SQL 来创建增量表:
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_override`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
PARTITION BY (updated_at)
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
)
如果表已存在,dbt-bytehouse 将继续执行以下 INSERT 查询:
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`
ENGINE = CnchMergeTree()
ORDER BY (tuple())
PARTITION BY (updated_at)
AS (
SELECT
id,
name,
email,
updated_at
FROM `bytehouse_demo`.`customers`
WHERE updated_at > (SELECT max(updated_at) FROM `bytehouse_demo`.`customers_summary_incremental_override`)
);
SELECT DISTINCT partition_id
FROM system.cnch_parts
WHERE part_type <= 2
AND database = 'bytehouse_demo'
AND table = 'customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc';
ALTER TABLE `bytehouse_demo`.`customers_summary_incremental_override`
REPLACE PARTITION ID '1705276800'
FROM `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_distributed_new_data`;
DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
使用如下内容,在 dbt 项目 seeds 文件夹下创建 country_codes.csv 文件:
country_code,country_name US,United States CA,Canada GB,United Kingdom
执行 dbt seed
命令:
dbt seed
该命令生成以下 ByteHouse 查询:
INSERT INTO `bytehouse_demo`.`country_codes` ("country_code", "country_name")
FORMAT CSV
US,United States
CA,Canada
GB,United Kingdom
ByteHouse 唯一键表为指定了唯一键(Unique Key)的 CnchMergeTree 表,详情请参见唯一键表。
dbt-bytehouse 支持创建物化视图时使用 ByteHouse 唯一键表,如需要使用,您可以在模型配置文件 models/xxx.sql 中额外设置 cnch_unique_key
。
{{ config(order_by='(id)', cnch_unique_key='(id)', materialized='table') }}
参数说明如下:
参数 | 配置说明 |
---|---|
| 该表的排序形式。 |
| ByteHouse Unique 表的 key 列。 |
| 该表的物化形式。 |