You need to enable JavaScript to run this app.
导航
dbt
最近更新时间:2025.05.22 13:57:36首次发布时间:2023.03.20 14:46:25
我的收藏
有用
有用
无用
无用

dbt(data building tool)工具将业务数据生成数据模型,以视图或表的形式存储到数据库,便于数据查询和分析。ByteHouse dbt 连接器(dbt-bytehouse)实现了 dbt 和 ByteHouse 的兼容,帮助您构建自己的数据模型。本文介绍了 dbt 及 dbt-bytehouse 的功能概况和使用方法。

功能概述

dbt 是一款专注数据转换(ELT 中的 T 环节)的开源工具,将数据分析师和工程师编写的 select 语句抽象为数据模型,通过 SQL 逻辑描述数据的结构、关系和转换规则,并通过物化的方式将数据模型持久化到数据库,以视图、表等形式存储模型。
当前 dbt-bytehouse 提供了 3 种类型的物化,也支持了快照和 seed 功能,说明如下:

功能

简介

视图(view)物化

逻辑定义,不存储数据,每次查询时动态计算。节省存储,但查询性能较低,适合轻量、低频使用的模型。

表(table)物化

将数据持久化为物理表,查询快,但占用存储,适合高频查询或需要预计算的结果(如聚合指标)。

增量(incremental)物化

仅处理新增或变更数据,大幅减少计算量,适合大数据场景。策略分为:

  • append:使用 append 策略的增量物化不会创建任何临时表,直接将新记录插入现有表。
  • insert_overwrite:按分区覆盖数据(如按天分区),替换特定分区的全部内容,确保数据最新(适用于时间分区表)。

增量物化暂不支持 mergedelete+insert 策略。

Seed

用于加载 CSV 等数据文件时的功能模块,它是一种加载文件入库参与模型构建的一种方式,常用于初始化业务配置、维度表或参考数据。

快照

dbt 中用于跟踪数据随时间变化的机制,记录数据的“历史版本”,捕捉源表中某条记录的字段变更(如状态更新、数值变动),并在快照表中保留所有历史状态及有效时间范围。

前提条件
  • 请确保已安装 Python 3.10,安装指南请参见 Python 3.10.0
  • 请确保已安装 pip,安装指南请参见 pip install

安装 dbt 和 ByteHouse dbt 连接器
  1. 安装 dbt。

    pip install dbt-core==1.8.9
    

    说明

    建议安装 dbt Core v1.8.x,详细安装指南请参见 dbt Core 简介及安装文档

  2. 安装 ByteHouse dbt 连接器 dbt-bytehouse。

    pip install dbt-bytehouse==1.8.9
    
  3. 检查是否安装成功。

    dbt --version
    

    安装成功的输出结果示例如下:
    Image

项目初始化

本节将说明如何创建新的 dbt 项目、虚拟环境,初始化 dbt 项目,并连接 dbt 和 ByteHouse。

  1. 创建新的 dbt 项目,命名为 bytehouse_demo_project,并打开该项目。

    mkdir bytehouse_demo_project
    cd bytehouse_demo_project
    
  2. 使用 Python 3.10 创建虚拟环境。

    python3.10 -m venv venv
    source venv/bin/activate
    
  3. 初始化 dbt 项目。
    dbt init 命令将初始化已创建的 dbt 项目,将创建一个新文件夹,包含项目名称、示例文件 models/example 和配置文件 dbt_project.yml。
    初始化过程中,dbt init 命令将提示输入项目名称和数据库,此时,请选择 clickhosue

    dbt init bytehouse_demo_project
    
  4. 切换至项目所在的文件夹目录。

    cd bytehouse_demo_project
    
  5. 移除 dbt 创建的示例文件。

    rm -rf models/example
    
  6. dbt 目录下创建 profiles.yml 文件,使用该文件配置 dbt 连接至 ByteHouse 的详细信息。

    dbt_bytehouse_profile:
      target: <target-name>
      outputs:
        <target-name>:
          type: clickhouse
          schema: <database-name>
    
          driver: http
          host: <bytehouse-host>
          port: 8123
          user: bytehouse
          password: <bytehouse-api-key>
          secure: True
          verify: False
          
          custom_settings:
              <setting-key>: <settings-value>
    

    示例文件参数说明如下,更多参数说明请参见 dbt 官方文档

    参数

    配置说明

    dbt_bytehouse_profile

    dbt 项目配置中使用的配置文件名。您可按需自定义该名称。

    target

    定义 dbt 项目要输出的目标环境。

    outputs

    定义 dbt 项目的输出信息,支持在同一份 profiles.yml 文件中通过设置 outputs 下的信息,配置多种输出环境,如下示例展示了定义 dev 和 prod 输出信息,输出的目标环境通过 target 指定为 prod:

    dbt_bytehouse_profile:
      target: <prod>
      outputs:
        <dev>:
          ...
          host: <host-for-dev>
          ...
        <prod>:
          ...
          host: <host-for-prod>
          ...
    

    type

    要使用的 dbt 适配器类型,需配置为固定值:clickhouse

    schema

    用于创建对象的默认架构,配置为 ByteHouse 已创建的数据库名称。

    driver

    通信协议,需配置为固定取值:http

    host

    配置为 ByteHouse 服务器的公网域名,格式为:gateway-v2.tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com。您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中获取。

    port

    ByteHouse 服务器上要连接的端口号,HTTP 协议需配置为固定取值:8123。

    user

    登录 ByteHouse 数据库的用户名,需配置为固定值 bytehouse

    password

    登录 ByteHouse 数据库的密码,密码为 <API_Key>,可通过 ByteHouse 控制台>租户管理>连接信息中获取,详细获取方式请参见获取 API Key

    secure

    布尔标志(true/false),指示是否使用加密连接(HTTPS)。需设置为 True

    verify

    需设置为 false

    custom_settings

    配置需传递给连接器的其他 ByteHouse 连接设置。您可以设置计算组、最大执行时间等参数。示例如下:

    custom_settings:
        virtual_warehouse: {VIRTUAL_WAREHOUSE_ID}
        max_execution_time: 3600
    

    计算组 ID 可通过 ByteHouse 控制台 > 租户管理 > 参数设置 > 默认计算组获取。

  7. 使用文本编辑器打开 bytehouse_demo_project 项目目录,更新该项目中的 dbt_project.yml 文件,确保该文件中的 profile 参数值与您在步骤 6 中为系统配置的 dbt/profiles.yaml 名称保持一致。
    例如,系统中 dbt/profiles.yaml 文件配置名称为 dbt_bytehouse_profile,则项目中 dbt_project.yaml 文件应将 profile 的值保持一致。
    系统中的 dbt/profiles.yaml 文件配置如下,框选部分为名称:
    Image
    项目中的 dbt_projet.yaml 文件配置示例如下,框选部分为需更新的 profile 参数值:
    Image

  8. 测试 dbt 是否成功连接至 ByteHouse。如果连接成功,输出结果会显示 “All checks passed!”。

    dbt debug
    

    连接成功的输出结果示例如下:
    Image

使用示例

准备 ByteHouse 数据集

在 ByteHouse 中准备表和数据示例,用于测试 dbt 项目。您可以在 ByteHouse 控制台中执行以下 SQL 命令:

DROP DATABASE IF EXISTS bytehouse_demo;
CREATE DATABASE bytehouse_demo;

CREATE TABLE bytehouse_demo.customers (
    id UInt64,
    name String,
    email String,
    updated_at DateTime
) 
ENGINE = CnchMergeTree
PRIMARY KEY id
ORDER BY (id, updated_at)
PARTITION BY updated_at;

INSERT INTO bytehouse_demo.customers (id, name, email, updated_at) VALUES
    (1, 'John Smith', 'john.smith@example.com', '2023-01-15 08:30:00'),
    (2, 'Jane Doe', 'jane.doe@example.com', '2023-01-16 10:45:00'),
    (3, 'Robert Johnson', 'robert.j@example.com', '2023-01-16 14:20:00'),
    (4, 'Sarah Williams', 'sarah.w@example.com', '2023-01-17 09:15:00'),
    (5, 'Michael Brown', 'michael.b@example.com', '2023-01-18 11:30:00');

创建 DBT 源

在 dbt 项目 models 文件夹下创建 schema.yml 文件,将 customers 表定义为 dbt 源:

version: 2

sources:
  - name: customers_source
    schema: bytehouse_demo
    tables:
      - name: customers

创建视图物化

使用示例

在 dbt 项目 models 文件夹下创建 customers_summary_view.sql 文件,用于创建视图物化:

{{
  config(
    materialized = 'view'
  )
}}

SELECT
  id,
  name,
  email,
  updated_at
FROM {{ source('customers_source', 'customers') }}

参数说明如下:

参数

配置说明

materialized

该表的物化形式。

工作原理

dbt-bytehouse 内部将执行以下 SQL 命令,创建视图。

CREATE OR REPLACE VIEW `ecosystem`.`customers_summary_view` AS (
  SELECT
    id,
    name,
    email,
    updated_at
  FROM `bytehouse_demo`.`customers`
);

创建表物化

使用示例

在 dbt 项目 models 文件夹下创建 customers_summary_table.sql 文件,用于创建 table 物化:

{{
  config(
    materialized = 'table'
  )
}}

SELECT
  id,
  name,
  email,
  updated_at
FROM {{ source('customers_source', 'customers') }}

参数说明如下:

参数

配置说明

materialized

该表的物化形式。

工作原理

  1. dbt-bytehouse 内部将执行以下 SQL 命令,创建表。

    CREATE TABLE `bytehouse_demo`.`customers_summary_table`
    ENGINE = CnchMergeTree()
    ORDER BY (tuple())
    AS (
     SELECT
       id,
       name,
       email,
       updated_at
     FROM `bytehouse_demo`.`customers`
    );
    
  2. 在表已经存在的前提下,该模型将执行后续操作,dbt-bytehouse 将执行以下一整套 SQL 查询。

    -- 1. 使用模型声明创建一个临时表,该临时表将用于替换现有的表
    CREATE TABLE `bytehouse_demo`.`customers_summary_table__dbt_tmp`
    ENGINE = CnchMergeTree()
    ORDER BY (tuple())
    AS (
     SELECT
       id,
       name,
       email,
       updated_at
     FROM `bytehouse_demo`.`customers`
    );
    
    -- 2. 如果备份表存在,则删除备份表
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table__dbt_backup`;
    
    -- 3. 将现有的表重命名为备份表
    RENAME TABLE `bytehouse_demo`.`customers_summary_table` TO `bytehouse_demo`.`customers_summary_table__dbt_backup`;
    
    -- 4. 删除现有的表
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table`;
    
    -- 5. 将临时表重命名为所需的表名
    RENAME TABLE `bytehouse_demo`.`customers_summary_table__dbt_tmp` TO `bytehouse_demo`.`customers_summary_table`;
    
    -- 6. 删除备份表
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_table__dbt_backup`;
    

创建增量物化

使用示例

在 dbt 项目 models 文件夹下创建 customers_summary_incremental.sql 文件,用于创建增量物化。此处,您需要提供 dbt 的 unique_keyis_incremental 子句。

{{
  config(
    materialized='incremental',
    unique_key='id'
  )
}}

SELECT
  id,
  name,
  email,
  updated_at
FROM {{ source('customers_source', 'customers') }}
{% if is_incremental() %}
  WHERE updated_at > (SELECT max(updated_at) FROM {{ this }})
{% endif %}

参数说明如下:

参数

配置说明

materialized

该表的物化形式。

unique_key

唯一键表的 unique_key 值。

工作原理

  1. dbt-bytehouse 将执行以下 SQL 命令,创建增量表:

    CREATE TABLE `bytehouse_demo`.`customers_summary_incremental` 
    ENGINE = CnchMergeTree() 
    ORDER BY (tuple()) 
    AS (
      SELECT 
        id, 
        name, 
        email, 
        updated_at 
      FROM `bytehouse_demo`.`customers`
    );
    
  2. 在表已经存在的前提下,dbt-bytehouse 将继续执行以下查询:

    -- 1. 创建中间表 __dbt_new_data,包含已更新/新数据
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
    
    CREATE TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`
    ENGINE = CnchMergeTree()
    ORDER BY (tuple())
    AS (
      SELECT
        id,
        name,
        email,
        updated_at
      FROM `bytehouse_demo`.`customers`
      WHERE updated_at > (
        SELECT max(updated_at) 
        FROM `bytehouse_demo`.`customers_summary_incremental`
      )
    );
    
    -- 2. 创建临时表 __dbt_tmp,包含尚未更新的/新的数据
    CREATE TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_tmp`
    AS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
    
    INSERT INTO `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` 
      ("id", "name", "email", "updated_at")
    SELECT 
      "id", 
      "name", 
      "email", 
      "updated_at"
    FROM `bytehouse_demo`.`customers_summary_incremental`
    WHERE (id) NOT IN (
      SELECT id
      FROM `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`
    );
    
    -- 3. 将新的/已更新的记录从 __dbt_new_data 表插入至临时表,此时,临时表 __dbt_tmp 包含全部记录
    INSERT INTO `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` 
      ("id", "name", "email", "updated_at")
    SELECT 
      "id", 
      "name", 
      "email", 
      "updated_at"
    FROM `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
    
    -- 4. 删除中间表 __dbt_new_data
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_new_data`;
    
    -- 5. 如果 backup 表已存在,则删除。backup 表用于存储现有表数据
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
    
    -- 6. 使用现有表数据创建 backup 表
    RENAME TABLE `bytehouse_demo`.`customers_summary_incremental` 
    TO `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
    
    -- 7. 删除现有表
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental`;
    
    -- 8. 将临时表重命名为最终表名称
    RENAME TABLE `bytehouse_demo`.`customers_summary_incremental__dbt_tmp` 
    TO `bytehouse_demo`.`customers_summary_incremental`;
    
    -- 9. 最后,删除 backup 表,后续将不再用到该表,backup 表仅用于故障恢复和原子性目的
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental__dbt_backup`;
    

使用 append 策略创建增量物化

使用示例

在 dbt 项目 models 文件夹下创建 customers_summary_incremental_append. sql 文件,使用 append 策略创建增量物化:

{{
  config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='append'
  )
}}

SELECT
  id,
  name,
  email,
  updated_at
FROM {{ source('customers_source', 'customers') }}
{% if is_incremental() %}
  WHERE updated_at > (SELECT max(updated_at) FROM {{ this }})
{% endif %}

参数说明如下:

参数

配置说明

materialized

该表的物化形式。

unique_key

唯一键表的 unique_key 值。

incremental_strategy

增量策略。

工作原理

  1. dbt-bytehouse 内部将执行以下 SQL 命令,创建增量表:

    CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_append` 
    ENGINE = CnchMergeTree() 
    ORDER BY (tuple()) 
    AS (
      SELECT 
        id, 
        name, 
        email, 
        updated_at 
      FROM `bytehouse_demo`.`customers`
    );
    
  2. 如果表已存在,dbt-bytehouse 将继续执行以下 INSERT 查询:

    INSERT INTO `bytehouse_demo`.`customers_summary_incremental_append` 
      ("id", "name", "email", "updated_at") 
    SELECT 
      id, 
      name, 
      email, 
      updated_at 
    FROM `bytehouse_demo`.`customers` 
    WHERE updated_at > (
      SELECT max(updated_at) 
      FROM `bytehouse_demo`.`customers_summary_incremental_append`
    );
    

使用 insert_overwrite 策略创建增量物化

使用示例

在 dbt 项目 models 文件夹下创建 customers_summary_incremental_override.sql 文件,使用 insert_overwrite 策略创建增量物化:

{{
  config(
    materialized='incremental',
    partition_by='updated_at',
    incremental_strategy='insert_overwrite'
  )
}}

SELECT
  id,
  name,
  email,
  updated_at
FROM {{ source('customers_source', 'customers') }}
{% if is_incremental() %}
  WHERE updated_at > (SELECT max(updated_at) FROM {{ this }})
{% endif %}

参数说明如下:

参数

配置说明

materialized

该表的物化形式。

partition_by

该表的分区列。

incremental_strategy

增量策略。

工作原理

  1. dbt-bytehouse 内部将执行以下 SQL 来创建增量表:

    CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_override` 
    ENGINE = CnchMergeTree() 
    ORDER BY (tuple()) 
    PARTITION BY (updated_at)
    AS (
        SELECT 
            id, 
            name, 
            email, 
            updated_at 
        FROM `bytehouse_demo`.`customers`
    )
    
  2. 如果表已存在,dbt-bytehouse 将继续执行以下 INSERT 查询:

    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
    
    CREATE TABLE `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`
    ENGINE = CnchMergeTree()
    ORDER BY (tuple())
    PARTITION BY (updated_at)
    AS (
        SELECT
            id,
            name,
            email,
            updated_at
        FROM `bytehouse_demo`.`customers`
        WHERE updated_at > (SELECT max(updated_at) FROM `bytehouse_demo`.`customers_summary_incremental_override`)
    );
    
    SELECT DISTINCT partition_id
    FROM system.cnch_parts
    WHERE part_type <= 2
    AND database = 'bytehouse_demo'
    AND table = 'customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc';
    
    ALTER TABLE `bytehouse_demo`.`customers_summary_incremental_override`
    REPLACE PARTITION ID '1705276800'
    FROM `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
    
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_distributed_new_data`;
    
    DROP TABLE IF EXISTS `bytehouse_demo`.`customers_summary_incremental_override__dbt_new_data_4992e4f4_05fc_42d1_b98f_860e4006d3cc`;
    

使用 seeds

  1. 使用如下内容,在 dbt 项目 seeds 文件夹下创建 country_codes.csv 文件:

    country_code,country_name
    US,United States
    CA,Canada
    GB,United Kingdom
    
  2. 执行 dbt seed 命令:

    dbt seed
    

    该命令生成以下 ByteHouse 查询:

    INSERT INTO `bytehouse_demo`.`country_codes` ("country_code", "country_name")
    FORMAT CSV
    US,United States
    CA,Canada
    GB,United Kingdom
    

(可选)使用 ByteHouse 唯一键

ByteHouse 唯一键表为指定了唯一键(Unique Key)的 CnchMergeTree 表,详情请参见唯一键表
dbt-bytehouse 支持创建物化视图时使用 ByteHouse 唯一键表,如需要使用,您可以在模型配置文件 models/xxx.sql 中额外设置 cnch_unique_key

{{ config(order_by='(id)', cnch_unique_key='(id)', materialized='table') }}

参数说明如下:

参数

配置说明

order_by

该表的排序形式。

cnch_unique_key

ByteHouse Unique 表的 key 列。

materialized

该表的物化形式。