脚本名称 | 更新日期 | 功能描述 |
|---|---|---|
import.py | 2026-01-09 | 支持记录已导入文件,下次运行时可自动跳过已导入文件,提升导入效率 |
importer.py | 2025-12-29 | 基于Python开发,实现按文件维度的批量导入功能,支撑基础导入场景 |
从对象存储(火山引擎TOS / AWS S3)按目录批量遍历Parquet格式数据文件,自动生成并执行 INSERT INTO ... SELECT ... FROM CnchS3(...) 语句,将数据高效导入至ByteHouse 目标表中,适用于大批量Parquet数据的批量导入场景。
工具通过规范化流程实现批量数据导入,整体逻辑清晰、可追溯,具体步骤如下:
tables.txt 中读取待导入的表清单,支持源表与ByteHouse目标表的自定义映射配置;config.ini 中配置的 import.url(对象存储基础路径)和 suffix_path_pattern(表路径后缀模板),自动拼接出每张表对应的对象存储目录路径;INSERT INTO bh_db.bh_table``SELECT <select_expr>FROM CnchS3('<file_url>', '<schema_expr>', 'Parquet', 'auto', '<ak>', '<sk>')SETTINGS <settings>;
说明
<select_expr> 为自定义查询表达式,默认自动生成,也可通过配置文件定义,<schema_expr> 为列类型映射表达式,
workers 参数指定的并发数,多线程执行导入 SQL;若某条 SQL 执行失败,将按默认规则重试,最终将所有失败的 SQL 语句写入 failed_sqls.sql 文件,便于后续排查。推荐使用 Python 3.8 及以上版本,确保脚本兼容性和运行稳定性,低版本Python可能存在依赖包安装失败或功能异常问题。
工具运行依赖对象存储SDK(TOS、AWS S3)和 ByteHouse 驱动,需提前执行以下命令安装依赖包:
pip install tos boto3 clickhouse-driver
说明
tos 包用于访问火山引擎 TOS 对象存储,boto3包用于访问 AWS S3 对象存储,clickhouse-driver用于与ByteHouse 建立连接并执行 SQL。
脚本运行目录下需包含以下文件,其中必填文件需提前配置完成,可选文件根据实际场景按需配置:
. ├── import.py # 主脚本 ├── config.ini # 配置文件(必填) ├── tables.txt # 表清单(必填) ├── custom_s3_table_schema_config.txt # 可选:列类型覆盖/修正 ├── custom_select_expr_config.txt # 可选:SELECT 表达式覆盖 └── failed_sqls.sql # 运行后生成:失败 SQL 记录
import.py:工具主执行脚本,负责整个导入流程的调度与执行(核心文件);config.ini:核心配置文件(必填),存储 ByteHouse 连接信息、对象存储访问信息、导入参数等;tables.txt:待导入表清单(必填),配置源表与 ByteHouse 目标表的映射关系;custom_s3_table_schema_config.txt:可选配置文件,用于修正 Parquet 文件列类型与 ByteHouse 表列类型不匹配的问题;custom_select_expr_config.txt:可选配置文件,用于导入时对特定列进行数据转换、清洗、默认值填充等操作;failed_sqls.sql:运行后自动生成的文件,记录所有导入失败的 SQL 语句,用于问题排查和二次重跑。快速上手需完成如下核心配置(tables.txt、config.ini),可选配置根据实际需求补充,配置完成后即可启动导入。
该文件用于配置待导入的表清单,支持两种配置方式,每行配置一张表,注释行以#开头:
当对象存储中的源表(用于定位数据路径)与ByteHouse中的目标表(用于写入数据)库名、表名完全一致时,直接填写源表名即可:
source_db1.source_table1 source_db2.source_table2 ...
当需要将源表数据导入至ByteHouse中不同库、不同名的目标表时,使用:分隔源表与目标表:
db1.table1:target_db1.target_table1 db2.table2:target_db2.target_table2 ...
注意
source_db.source_table 用于定位对象存储中的数据路径和元数据key;target_db.target_table 是实际写入数据的ByteHouse表,需提前在ByteHouse中创建完成。
该文件分为[bytehouse](ByteHouse连接配置)和[import](导入参数配置)两个模块,以下分别提供 TOS 和 AWS S3 场景的完整配置示例,尖括号<>中的内容需替换为实际值。
[bytehouse] host = <bytehouse-host> # ByteHouse公网/私网域名 api_token = <api_token> # ByteHouse账户API Key vw_id = <virtual_warehouse> # 导入使用的计算组ID [import] ak = <access_key> # TOS访问AK sk = <secret_key> # TOS访问SK bucket = <bucket_name> # TOS存储桶名称 url = https://<bucket_name>.tos-s3-<region>.volces.com/<prefix> # TOS基础路径 suffix_path_pattern = /{db}/{table}/ # 表路径后缀模板 format = Parquet # 数据文件格式 compression = auto # 压缩格式自动检测 workers = 32 # 导入并发度 sql_settings = "" # 额外SQL执行参数 continue_on_failure = false # 失败时是否继续下一个表 enable_skip_imported_files = false # 是否跳过已导入文件 dry_run = false # 是否开启Dry-Run模式(仅打印SQL不执行)
[bytehouse] host = <bytehouse-host> # ByteHouse公网/私网域名 api_token = <api_token> # ByteHouse账户API Key vw_id = <virtual_warehouse> # 导入使用的计算组ID [import] ak = <access_key> # AWS S3访问AK sk = <secret_key> # AWS S3访问SK bucket = <bucket_name> # AWS S3存储桶名称 url = https://<bucket_name>.s3.<region>.amazonaws.com/<prefix> # S3基础路径 suffix_path_pattern = /{db}/{table}/ # 表路径后缀模板 format = Parquet # 数据文件格式 compression = auto # 压缩格式自动检测 workers = 32 # 导入并发度 sql_settings = "" # 额外SQL执行参数 continue_on_failure = false # 失败时是否继续下一个表 dry_run = false # 是否开启Dry-Run模式(仅打印SQL不执行)
配置模块 | 配置项 | 是否必填 | 默认值 | 详细说明 |
|---|---|---|---|---|
bytehouse(ByteHouse连接配置) | host | 是 | 无 | ByteHouse的公网/私网域名,选择规则:
说明 可参考获取 ByteHouse 连接信息,获取 ByteHouse 账户的连接地址。Host 格式一般如下: |
api_token | 是 | 无 | ByteHouse账户的API Key,用于身份验证,获取方式参考ByteHouse官方文档“获取API Key”章节。 | |
vw_id | 是 | 无 | 导入数据所使用的ByteHouse计算组ID,可登录ByteHouse控制台,在“计算组”页面查看并复制。 | |
import(导入参数配置) | ak | 是 | 无 | ByteHouse CnchS3外表访问对象存储桶的Access Key ID(AK);若使用TOS,填写IAM账户的AK,获取方式参考“Access Key(密钥)管理”文档。 |
sk | 是 | 无 | ByteHouse CnchS3外表访问对象存储桶的Secret Access Key(SK);若使用TOS,填写IAM账户的SK,获取方式同AK。 | |
bucket | 是 | 无 | 对象存储桶的名称,需确保该存储桶与ByteHouse计算组网络可达。 | |
url | 是 | 无 | 对象存储中数据文件的基础路径前缀,格式需符合对应厂商规范。
| |
suffix_path_pattern | 是 | /{db}/{table}/ | 表数据目录的后缀模板,脚本将自动用源表的{db}(库名)和{table}(表名)替换模板中的占位符,与url拼接后形成最终的表数据目录路径。 | |
format | 是 | Parquet | 数据文件格式,默认且推荐使用Parquet(高效压缩、列存优化),同时支持CSV、JSONEachRow等格式。 | |
compression | 是 | auto | 文件压缩格式,设置为auto时,系统将自动检测文件压缩类型,无需手动指定。 | |
workers | 是 | 1 | 导入并发度,需根据计算组(vw)规格调整:L及以上规格推荐设置为32,其他规格推荐设置为8或16,避免并发过高导致ByteHouse资源耗尽。 | |
sql_settings | 否 | "" | 导入SQL需额外配置的执行参数,例如: | |
continue_on_failure | 否 | false | 当某张表导入出现失败时,是否跳过该表,继续执行下一张表的导入;建议生产环境设置为false,便于及时发现问题。 | |
shuffle_file_list | 否 | false | 是否打乱文件列表的顺序后再导入;针对ByteHouse Unique表,打乱顺序可减少数据去重压力(降低同一分区数据并发导入的概率)。 | |
enable_skip_imported_files | 否 | false | 是否启用已导入文件跳过模式:开启后,脚本会在当前目录生成 | |
dry_run | 是 | false | 是否开启Dry-Run(测试)模式:开启后,脚本仅打印生成的导入SQL,不实际执行导入操作,用于验证配置和SQL的正确性。 |
脚本内部已默认配置部分SQL执行参数,用户在config.ini中配置的sql_settings参数优先级更高,将与默认参数合并后作为最终执行参数。默认参数如下:
SQL_DEFAULT_SETTINGS = { "input_format_parquet_allow_missing_columns": "1", "max_execution_time": "3000", "max_partitions_per_insert_block": "7500" }
当Parquet文件列类型与ByteHouse表列类型不匹配,或需要对数据进行清洗转换时,可通过以下两个可选配置文件实现自定义映射。
适用场景:解决Parquet文件中列类型与ByteHouse目标表列类型不匹配导致的导入失败问题,手动声明列类型映射关系。
配置格式(示例):
db1.table1: col1 --> Nullable(String) | col2 --> Nullable(Int32)
格式说明:
: 左侧为源库表名(db1.table1),需与tables.txt中的源表名一致;| 用于分隔同一表中的多个列类型配置;--> 左侧为Parquet文件中的列名,右侧为需要传给CnchS3的类型表达式(需与ByteHouse表列类型匹配)。核心原理:CnchS3外表的字段类型默认与ByteHouse目标表一致,当与Parquet文件列类型不匹配时,会导致读取失败;通过该配置手动修正类型,可确保数据正常读取,后续可通过SELECT表达式进一步处理数据。
适用场景:导入过程中对特定列进行数据转换、清洗、默认值填充等操作,适配业务数据需求。
配置格式(示例):
# 将db1.table1的col1字段用splitByString函数处理,返回Array(String)类型 db1.table1: col1 --> splitByString('@<@', `col1`) as `col1`
说明
配置规则与列类型修正文件一致,-->右侧为ByteHouse支持的SQL表达式,可实现字段拆分、空值填充、类型转换等功能。
工具支持后台运行(推荐生产环境)和前台运行(推荐测试环境),可根据实际场景选择,运行前需确保所有必填配置已完成。
后台运行可避免终端关闭导致脚本中断,同时将运行日志输出至指定文件,便于后续查看进度和排查问题:
nohup python import.py > output.log 2>&1 & # 实时查看运行日志 tail -F output.log
仅推荐在开启dry_run=true(测试模式)时使用,便于实时查看生成的SQL语句:
python import.py
脚本运行过程中会输出实时进度,运行结束后会生成失败SQL记录文件,便于用户跟踪导入状态和排查问题。
.,用于快速识别实时导入进度;... .......... 12.50% (250/2000) average import time per file: [8 seconds] ...
说明
250为已导入文件数,2000为总文件数,12.50%为当前导入进度,平均每个文件导入时间为8秒。
当某张表所有文件导入完成,会打印表导入完成提示,可通过finished importing关键字快速筛选成功导入的表,示例如下:
... ... finished importing [db1.table1] ...
若导入过程中出现SQL执行失败,脚本会自动将失败的SQL语句写入当前目录的failed_sqls.sql文件中,每个失败SQL会附带失败时间和错误提示,便于用户手工排查问题、二次重跑。
针对导入过程中常见的报错场景,提供以下排查方向和解决方案,快速定位并解决问题。
import.url格式是否符合对应对象存储规范(TOS格式:https://<bucket>.tos-s3-<region>.<domain>/<prefix>);ListBucket权限(用于遍历目录下的文件)。GetObject权限(用于读取Parquet文件内容);sql_settings中添加参数input_format_parquet_allow_missing_columns=1,允许跳过缺失的字段;custom_s3_table_schema_config.txt修正Parquet文件与ByteHouse表的列类型映射;custom_select_expr_config.txt对字段进行类型转换、空值填充,适配表结构。workers并发数,避免ByteHouse计算组资源耗尽,导致单文件导入延迟;结合生产环境实际使用场景,提供以下最佳实践,提升导入效率、降低失败率、简化运维成本。
workers参数不要盲目拉高,建议从4、8、16、32逐步测试,结合ByteHouse计算组资源、并发上限和导入失败率调整;L及以上规格计算组可直接设置为32;custom_select_expr_config.txt配置SQL表达式做兜底处理,避免导入失败;dry_run=true模式,验证生成的SQL语句是否符合预期、配置是否正确,避免批量导入失败;enable_skip_imported_files=true,实现增量导入,避免重复导入已处理文件,节省资源和时间。