You need to enable JavaScript to run this app.
ByteHouse云数仓版

ByteHouse云数仓版

复制全文
数据导入/导出
数据导入工具
复制全文
数据导入工具

1 工具概述

1.1 脚本信息

脚本名称

更新日期

功能描述

import.py

import.py
未知大小

2026-01-09

支持记录已导入文件,下次运行时可自动跳过已导入文件,提升导入效率

importer.py

importer.py
未知大小

2025-12-29

基于Python开发,实现按文件维度的批量导入功能,支撑基础导入场景

1.2 核心用途

从对象存储(火山引擎TOS / AWS S3)按目录批量遍历Parquet格式数据文件,自动生成并执行 INSERT INTO ... SELECT ... FROM CnchS3(...) 语句,将数据高效导入至ByteHouse 目标表中,适用于大批量Parquet数据的批量导入场景。

1.3 核心特点

  • 多线程并发导入:支持自定义并发数,充分利用计算资源,提升导入效率;
  • 失败自动重试:导入过程中若出现失败,将按配置自动重试,降低人工干预成本;
  • 失败SQL记录:所有导入失败的SQL语句将自动写入指定文件,便于后续排查问题、二次重跑;
  • 灵活自定义映射:支持列类型手动修正、SELECT表达式自定义,适配Parquet文件与ByteHouse表结构不匹配场景。

2 工作原理概览

工具通过规范化流程实现批量数据导入,整体逻辑清晰、可追溯,具体步骤如下:

  1. 读取表清单:从配置文件 tables.txt 中读取待导入的表清单,支持源表与ByteHouse目标表的自定义映射配置;
  2. 拼接存储目录:根据 config.ini 中配置的 import.url(对象存储基础路径)和 suffix_path_pattern(表路径后缀模板),自动拼接出每张表对应的对象存储目录路径;
  3. 遍历文件列表:调用对应对象存储厂商的List API(TOS使用TOS SDK,AWS S3使用boto3),列出目标目录下的所有文件key,自动过滤掉目录本身和SUCCESS标识文件(避免无效导入);
  4. 生成导入SQL:为每个符合条件的Parquet文件,自动生成一条标准导入SQL,格式如下:

INSERT INTO bh_db.bh_table``
SELECT <select_expr>
FROM CnchS3('<file_url>', '<schema_expr>', 'Parquet', 'auto', '<ak>', '<sk>')
SETTINGS <settings>;

说明

<select_expr> 为自定义查询表达式,默认自动生成,也可通过配置文件定义,<schema_expr> 为列类型映射表达式,/ 为对象存储访问密钥, 为SQL执行参数。

  1. 并发执行与重试:以配置文件中 workers 参数指定的并发数,多线程执行导入 SQL;若某条 SQL 执行失败,将按默认规则重试,最终将所有失败的 SQL 语句写入 failed_sqls.sql 文件,便于后续排查。

3 运行环境与依赖

3.1 Python 版本要求

推荐使用 Python 3.8 及以上版本,确保脚本兼容性和运行稳定性,低版本Python可能存在依赖包安装失败或功能异常问题。

3.2 依赖包安装

工具运行依赖对象存储SDK(TOS、AWS S3)和 ByteHouse 驱动,需提前执行以下命令安装依赖包:

pip install tos boto3 clickhouse-driver

说明

tos 包用于访问火山引擎 TOS 对象存储,boto3包用于访问 AWS S3 对象存储,clickhouse-driver用于与ByteHouse 建立连接并执行 SQL。

4 目录与文件说明

脚本运行目录下需包含以下文件,其中必填文件需提前配置完成,可选文件根据实际场景按需配置:

.
├── import.py                         # 主脚本
├── config.ini                        # 配置文件(必填)
├── tables.txt                        # 表清单(必填)
├── custom_s3_table_schema_config.txt # 可选:列类型覆盖/修正
├── custom_select_expr_config.txt     # 可选:SELECT 表达式覆盖
└── failed_sqls.sql                   # 运行后生成:失败 SQL 记录
  • import.py:工具主执行脚本,负责整个导入流程的调度与执行(核心文件);
  • config.ini:核心配置文件(必填),存储 ByteHouse 连接信息、对象存储访问信息、导入参数等;
  • tables.txt:待导入表清单(必填),配置源表与 ByteHouse 目标表的映射关系;
  • custom_s3_table_schema_config.txt:可选配置文件,用于修正 Parquet 文件列类型与 ByteHouse 表列类型不匹配的问题;
  • custom_select_expr_config.txt:可选配置文件,用于导入时对特定列进行数据转换、清洗、默认值填充等操作;
  • failed_sqls.sql:运行后自动生成的文件,记录所有导入失败的 SQL 语句,用于问题排查和二次重跑。

5 快速开始

快速上手需完成如下核心配置(tables.txt、config.ini),可选配置根据实际需求补充,配置完成后即可启动导入。

5.1 配置 tables.txt(表清单)

该文件用于配置待导入的表清单,支持两种配置方式,每行配置一张表,注释行以#开头:

方式1:源表与目标表同库同名

当对象存储中的源表(用于定位数据路径)与ByteHouse中的目标表(用于写入数据)库名、表名完全一致时,直接填写源表名即可:

source_db1.source_table1
source_db2.source_table2
...

方式2:源表与目标表自定义映射

当需要将源表数据导入至ByteHouse中不同库、不同名的目标表时,使用:分隔源表与目标表:

db1.table1:target_db1.target_table1
db2.table2:target_db2.target_table2
...

注意

source_db.source_table 用于定位对象存储中的数据路径和元数据key;target_db.target_table 是实际写入数据的ByteHouse表,需提前在ByteHouse中创建完成。

5.2 配置 config.ini(核心配置)

该文件分为[bytehouse](ByteHouse连接配置)和[import](导入参数配置)两个模块,以下分别提供 TOS 和 AWS S3 场景的完整配置示例,尖括号<>中的内容需替换为实际值。

5.2.1 TOS 对象存储配置示例

[bytehouse]
host = <bytehouse-host>          # ByteHouse公网/私网域名
api_token = <api_token>          # ByteHouse账户API Key
vw_id = <virtual_warehouse>      # 导入使用的计算组ID

[import]
ak = <access_key>                # TOS访问AK
sk = <secret_key>                # TOS访问SK
bucket = <bucket_name>           # TOS存储桶名称
url = https://<bucket_name>.tos-s3-<region>.volces.com/<prefix>  # TOS基础路径
suffix_path_pattern = /{db}/{table}/  # 表路径后缀模板
format = Parquet                  # 数据文件格式
compression = auto                # 压缩格式自动检测
workers = 32                      # 导入并发度
sql_settings = ""                 # 额外SQL执行参数
continue_on_failure = false       # 失败时是否继续下一个表
enable_skip_imported_files = false # 是否跳过已导入文件
dry_run = false                   # 是否开启Dry-Run模式(仅打印SQL不执行)

5.2.2 AWS S3 对象存储配置示例

[bytehouse]
host = <bytehouse-host>          # ByteHouse公网/私网域名
api_token = <api_token>          # ByteHouse账户API Key
vw_id = <virtual_warehouse>      # 导入使用的计算组ID

[import]
ak = <access_key>                # AWS S3访问AK
sk = <secret_key>                # AWS S3访问SK
bucket = <bucket_name>           # AWS S3存储桶名称
url = https://<bucket_name>.s3.<region>.amazonaws.com/<prefix>  # S3基础路径
suffix_path_pattern = /{db}/{table}/  # 表路径后缀模板
format = Parquet                  # 数据文件格式
compression = auto                # 压缩格式自动检测
workers = 32                      # 导入并发度
sql_settings = ""                 # 额外SQL执行参数
continue_on_failure = false       # 失败时是否继续下一个表
dry_run = false                   # 是否开启Dry-Run模式(仅打印SQL不执行)

5.3 配置项详细说明

配置模块

配置项

是否必填

默认值

详细说明

bytehouse(ByteHouse连接配置)

host

ByteHouse的公网/私网域名,选择规则:

  1. 推荐公网域名:
  • TOS与ByteHouse不同地域。
  • AWS S3/MinIO未搭建专线。
  1. 推荐私网域名:
  • TOS与ByteHouse同地域。
  • AWS S3/MinIO已搭建专线。

说明

可参考获取 ByteHouse 连接信息,获取 ByteHouse 账户的连接地址。Host 格式一般如下:
公网:tenant-{account_id}-{REGION}-{env}-public.bytehouse.volces.com
私网:tenant-{account_id}-{REGION}-{env}.bytehouse.ivolces.com

api_token

ByteHouse账户的API Key,用于身份验证,获取方式参考ByteHouse官方文档“获取API Key”章节。

vw_id

导入数据所使用的ByteHouse计算组ID,可登录ByteHouse控制台,在“计算组”页面查看并复制。

import(导入参数配置)

ak

ByteHouse CnchS3外表访问对象存储桶的Access Key ID(AK);若使用TOS,填写IAM账户的AK,获取方式参考“Access Key(密钥)管理”文档。

sk

ByteHouse CnchS3外表访问对象存储桶的Secret Access Key(SK);若使用TOS,填写IAM账户的SK,获取方式同AK。

bucket

对象存储桶的名称,需确保该存储桶与ByteHouse计算组网络可达。

url

对象存储中数据文件的基础路径前缀,格式需符合对应厂商规范。

  • 从TOS导入:
# 公网
https://{bucket}.tos-s3-{region}.volces.com/some-dir
# 私网
https://{bucket}.tos-s3-{region}.ivolces.com/some-dir
  • 从其他对象存储服务导入:
# AWS S3
https://{bucket}.s3.{region}.amazonaws.com/some-dir

suffix_path_pattern

/{db}/{table}/

表数据目录的后缀模板,脚本将自动用源表的{db}(库名)和{table}(表名)替换模板中的占位符,与url拼接后形成最终的表数据目录路径。

format

Parquet

数据文件格式,默认且推荐使用Parquet(高效压缩、列存优化),同时支持CSV、JSONEachRow等格式。

compression

auto

文件压缩格式,设置为auto时,系统将自动检测文件压缩类型,无需手动指定。

workers

1

导入并发度,需根据计算组(vw)规格调整:L及以上规格推荐设置为32,其他规格推荐设置为8或16,避免并发过高导致ByteHouse资源耗尽。

sql_settings

""

导入SQL需额外配置的执行参数,例如:max_execution_time=3600,max_partitions_per_insert_block=1000(设置SQL超时时间和每块插入分区数)。

continue_on_failure

false

当某张表导入出现失败时,是否跳过该表,继续执行下一张表的导入;建议生产环境设置为false,便于及时发现问题。

shuffle_file_list

false

是否打乱文件列表的顺序后再导入;针对ByteHouse Unique表,打乱顺序可减少数据去重压力(降低同一分区数据并发导入的概率)。

enable_skip_imported_files

false

是否启用已导入文件跳过模式:开启后,脚本会在当前目录生成import-state文件夹,为每张已导入的表创建命名为<db>.<table>.record的 state 文件,记录已导入文件的key和导入时间;下次运行时,将自动过滤掉已导入文件,实现增量导入。

dry_run

false

是否开启Dry-Run(测试)模式:开启后,脚本仅打印生成的导入SQL,不实际执行导入操作,用于验证配置和SQL的正确性。

脚本内部已默认配置部分SQL执行参数,用户在config.ini中配置的sql_settings参数优先级更高,将与默认参数合并后作为最终执行参数。默认参数如下:

SQL_DEFAULT_SETTINGS = {
 "input_format_parquet_allow_missing_columns": "1",
 "max_execution_time": "3000",
 "max_partitions_per_insert_block": "7500"
}

5.4 自定义配置(可选)

当Parquet文件列类型与ByteHouse表列类型不匹配,或需要对数据进行清洗转换时,可通过以下两个可选配置文件实现自定义映射。

5.4.1 custom_s3_table_schema_config.txt(列类型修正)

适用场景:解决Parquet文件中列类型与ByteHouse目标表列类型不匹配导致的导入失败问题,手动声明列类型映射关系。
配置格式(示例):

db1.table1: col1 --> Nullable(String) | col2 --> Nullable(Int32)

格式说明

  • : 左侧为源库表名(db1.table1),需与tables.txt中的源表名一致;
  • | 用于分隔同一表中的多个列类型配置;
  • --> 左侧为Parquet文件中的列名,右侧为需要传给CnchS3的类型表达式(需与ByteHouse表列类型匹配)。

核心原理:CnchS3外表的字段类型默认与ByteHouse目标表一致,当与Parquet文件列类型不匹配时,会导致读取失败;通过该配置手动修正类型,可确保数据正常读取,后续可通过SELECT表达式进一步处理数据。

5.4.2 custom_select_expr_config.txt(数据转换)

适用场景:导入过程中对特定列进行数据转换、清洗、默认值填充等操作,适配业务数据需求。
配置格式(示例):

# 将db1.table1的col1字段用splitByString函数处理,返回Array(String)类型
db1.table1: col1 --> splitByString('@<@', `col1`) as `col1`

说明

配置规则与列类型修正文件一致,-->右侧为ByteHouse支持的SQL表达式,可实现字段拆分、空值填充、类型转换等功能。

6 运行方式

工具支持后台运行(推荐生产环境)和前台运行(推荐测试环境),可根据实际场景选择,运行前需确保所有必填配置已完成。

6.1 正式运行(生产环境推荐)

后台运行(推荐)

后台运行可避免终端关闭导致脚本中断,同时将运行日志输出至指定文件,便于后续查看进度和排查问题:

nohup python import.py > output.log 2>&1 &

# 实时查看运行日志
tail -F output.log

前台运行(不推荐)

仅推荐在开启dry_run=true(测试模式)时使用,便于实时查看生成的SQL语句:

python import.py

7 输出与日志说明

脚本运行过程中会输出实时进度,运行结束后会生成失败SQL记录文件,便于用户跟踪导入状态和排查问题。

7.1 导入进度输出

  • 单个文件导入成功:打印一个.,用于快速识别实时导入进度;
  • 批量进度提示:每成功导入10个文件,或导入完最后一批文件时,打印一次详细进度,示例如下:
...
.......... 12.50% (250/2000) average import time per file: [8 seconds]
...

说明

250为已导入文件数,2000为总文件数,12.50%为当前导入进度,平均每个文件导入时间为8秒。

7.2 表导入成功提示

当某张表所有文件导入完成,会打印表导入完成提示,可通过finished importing关键字快速筛选成功导入的表,示例如下:

...
... finished importing [db1.table1]
...

7.3 失败SQL记录

若导入过程中出现SQL执行失败,脚本会自动将失败的SQL语句写入当前目录的failed_sqls.sql文件中,每个失败SQL会附带失败时间和错误提示,便于用户手工排查问题、二次重跑。

8 常见问题排查

针对导入过程中常见的报错场景,提供以下排查方向和解决方案,快速定位并解决问题。

8.1 报错:无法list对象存储文件 / vendor识别失败

  • 排查1:检查import.url格式是否符合对应对象存储规范(TOS格式:https://<bucket>.tos-s3-<region>.<domain>/<prefix>);
  • 排查2:检查配置的AK/SK是否拥有对象存储的ListBucket权限(用于遍历目录下的文件)。

8.2 导入报AccessDenied / 403

  • 排查1:确认配置的AK/SK拥有对象存储的GetObject权限(用于读取Parquet文件内容);
  • 排查2:检查对象存储是否启用了额外的访问控制(如IP白名单、VPC终端节点等),确保ByteHouse计算组能够正常访问对象存储。

8.3 导入报Parquet字段缺失 / 类型不匹配

  • 解决方案1:在sql_settings中添加参数input_format_parquet_allow_missing_columns=1,允许跳过缺失的字段;
  • 解决方案2:通过custom_s3_table_schema_config.txt修正Parquet文件与ByteHouse表的列类型映射;
  • 解决方案3:通过custom_select_expr_config.txt对字段进行类型转换、空值填充,适配表结构。

8.4 某文件导入非常慢

  • 解决方案1:调小workers并发数,避免ByteHouse计算组资源耗尽,导致单文件导入延迟;
  • 解决方案2:若单文件体积过大(超过1GB),建议上游将Parquet文件切分为256MB~1GB的小文件,降低单文件导入压力和重试成本。

9 最佳实践建议

结合生产环境实际使用场景,提供以下最佳实践,提升导入效率、降低失败率、简化运维成本。

  1. 控制文件大小:优先保证每个Parquet文件大小在256MB~1GB之间,既避免单文件过大导致的重试成本高、导入慢问题,也能充分利用多线程并发导入的优势,提升整体导入效率;
  2. 合理设置并发度workers参数不要盲目拉高,建议从4、8、16、32逐步测试,结合ByteHouse计算组资源、并发上限和导入失败率调整;L及以上规格计算组可直接设置为32;
  3. 脏数据兜底处理:针对数据中的脏数据、NULL值、DateTime溢出等问题,优先通过custom_select_expr_config.txt配置SQL表达式做兜底处理,避免导入失败;
  4. 先测试后执行:正式导入前,开启dry_run=true模式,验证生成的SQL语句是否符合预期、配置是否正确,避免批量导入失败;
  5. 增量导入优化:对于周期性导入场景,启用enable_skip_imported_files=true,实现增量导入,避免重复导入已处理文件,节省资源和时间。
最近更新时间:2026.03.20 17:54:51
这个页面对您有帮助吗?
有用
有用
无用
无用