You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

使用dbt Core在ClickHouse构建表时行数不足且结果不一致

问题描述

我用dbt在ClickHouse中创建/全量刷新表,未指定引擎时默认使用ReplacingMergeTree,已在dbt_projects.yml配置materialized: table。现在需要合并两张表:

  • fct_offline_score_combined:约2亿行
  • int_online_unified:约78万行

通过Union DISTINCT合并到目标表fct_unified_score,但每次运行dbt模型,目标表总行数均达不到预期,且每次结果不一致。尝试修改表引擎并在临时表使用Final子句时报错,提示MergeTree引擎无法使用Final子句。猜测可能因内存不足导致加载失败,但多数情况未记录错误,想了解能否在dbt中配置单插入的批量插入。

模型代码

with offline as (
    select * from {{ref('fct_offline_score_combined')}}
),
online as (
    select 
        _id,
        koboformid as kobo_form_id,
        studentId as student_id,
        schoolUdise as udise,
        schoolName as school_name,
        first_name,
        last_name,
        CASE
            WHEN trim(BOTH ' ' FROM gender) = '' THEN 'No data'
            WHEN UPPER(trim(BOTH ' ' FROM gender)) = 'MALE' THEN 'MALE'
            WHEN UPPER(trim(BOTH ' ' FROM gender)) = 'FEMALE' THEN 'FEMALE'
            ELSE 'OTHER'
        END AS gender,
        CASE 
            WHEN assesmentType = '' then 'No data'
            ELSE UPPER(assesmentType)
        END as assessment_type,
        CASE 
            WHEN subject = '' then 'No data'
            ELSE UPPER(subject)
        END as subject,
        CASE 
            WHEN state_id = '' then 'No data'
            ELSE UPPER(state_id)
        END as state_id,
        CASE 
            WHEN programId = '' then 'No data'
            ELSE programId
        END as program_id,
        assessment_id,
        questionId as question_id,
        cast(question_no as Nullable(Float64)) as question_no,
        cast(level_no as Nullable(Float64)),
        question_name,
        cast(score as Nullable(Float64)) as score,
        cast(max_score as Nullable(Float64)) as max_score,
        cast(studentLevel as Nullable(Float64)) as student_level,
        CASE 
            WHEN treatment = '' then 'No data'
            ELSE concat(upper(substring(treatment, 1, 1)), lower(substring(treatment, 2)))
        END AS treatment,
        denomination,
        cast(startDate as String) as program_date,
        start_time,
        end_time
    from {{ref('int_online_unified')}}
),
main as (
    select * from offline
    union DISTINCT
    select * from online
)
select * from main

ClickHouse默认设置(来自dbt.log)

create table `analytics`.`fct_unified_score__dbt_backup`
        
  
        
  engine = MergeTree()
        
      order by (tuple())
        
        
        SETTINGS  replicated_deduplication_window=0

            empty
          as (
            with offline as (
    select * from `analytics`.`fct_offline_score_combined` 
),
问题分析与解决方案

1. 核心问题定位

  • 引擎与排序键缺失:从日志可见,dbt创建的备份表使用了无排序键的MergeTree(),而ReplacingMergeTree需要明确的排序键来识别重复行。空排序键会导致Union DISTINCT的去重逻辑失效,同时MergeTree后台合并时的数据可见性问题会引发结果行数不一致。
  • 大表Union DISTINCT内存压力:2亿行+78万行的全量去重对内存要求极高,ClickHouse可能因内存不足静默终止部分数据处理,导致数据不完整。

2. 修复步骤

(1)显式配置表引擎与排序键

在模型文件开头添加配置,指定ReplacingMergeTree及能唯一标识行的排序键(比如_id+question_id组合,需根据业务调整):

{{ config(
    engine='ReplacingMergeTree()',
    order_by='(_id, question_id)',
    materialized='table'
) }}

ReplacingMergeTree会根据排序键自动去重,替代Union DISTINCT的部分工作,大幅降低内存压力。

(2)替换Union DISTINCT为高效去重方式

Union DISTINCT需全量加载数据到内存去重,对大表极不友好,建议改为:

main as (
    select * from offline
    union all
    select * from online
)
select * from main
group by _id, question_id -- 按唯一键分组,比全字段分组更高效

或使用SELECT DISTINCT * FROM (select * from offline union all select * from online),但按唯一键分组性能更优。

(3)配置dbt批量插入参数

dbt_projects.yml中全局配置,或在模型config中单独设置批量插入参数,控制批次大小以减少内存占用:

# dbt_projects.yml全局配置
models:
  your_project_name:
    +materialized: table
    +clickhouse_batch_size: 100000
    +clickhouse_insert_strategy: 'batch'

或模型内单独配置:

{{ config(
    clickhouse_batch_size=100000,
    clickhouse_insert_strategy='batch'
) }}

(4)正确使用Final子句

Final子句仅适用于ReplacingMergeTree等支持合并逻辑的引擎,且需在查询时使用,而非建表时。若需强制合并去重,查询目标表时可添加:

SELECT * FROM fct_unified_score FINAL

注意:FINAL会降低查询效率,优先通过合理排序键和后台合并保证数据准确性。

3. 额外优化建议

  • fct_offline_score_combined预先按排序键分区、排序,减少合并计算量。
  • 通过SELECT * FROM system.metrics WHERE metric = 'memory_usage'监控ClickHouse内存使用,确认是否存在内存不足问题。
  • EXPLAIN分析Union DISTINCT执行计划,定位性能瓶颈。

内容的提问来源于stack exchange,提问作者Hemanth P

火山引擎 最新活动