ByteHouse 云数仓版 Spark Connector 连接器专门用于通过 Spark 将数据加载到 ByteHouse 云数仓版。本文将介绍通过 Spark SQL、EMR 支持的 Servless Spark 以及 Spark Jar 三种方式连接 ByteHouse 并处理数据。
请按照下面的方法,在程序中配置以下驱动的依赖项。
对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml
文件中。
添加如下依赖。
Spark 版本 | 依赖 | 下载 |
---|---|---|
Spark 3.5 |
| |
Spark 3.3 |
|
将以下存储库添加到 pom.xml
文件:
<repository> <id>bytedance</id> <name>ByteDance Public Repository</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository>
连接至 ByteHouse 所需的通用参数及配置说明如下。更多连接参数的配置说明请参见下文的配置参数章节。
参数 | 配置说明 |
---|---|
| 配置为 ByteHouse 的公网连接域名。您可以在 ByteHouse 控制台的 租户管理 > 基本信息>网络信息 中查看对应信息。详情请参见 步骤二:配置网络信息。 |
| 即 |
| 配置为需连接的 ByteHouse 的数据库名。您可通过 ByteHouse 控制台 > 数据库,查看并复制需连接的数据库名称。 |
| 配置为 ByteHouse 计算组名,您可通过 ByteHouse 控制台 > 租户管理 > 参数设置,查看默认计算组。 |
在本地测试前,请将以下 jar 包加载到 ${SPARK_HOME/jars}
,或者在提交命令时使用 --jars
指定 Jar 的本地路径。
使用时请将 CLICKHOUSE_HOST、CLICKHOUSE_PASSWORD、CLICKHOUSE_DATABASE、CLICKHOUSE_VW 替换为实际值,获取方式请参见连接参数配置说明。
export SPARK_LOCAL_IP=localhost export SPARK_HOME=/opt/tiger/spark export CLICKHOUSE_HOST=bytehouse-{REGION}.volces.com export CLICKHOUSE_PASSWORD=<your-api-key> export CLICKHOUSE_DATABASE=<database> export CLICKHOUSE_VW=<your-virtual-warehouse> $SPARK_HOME/bin/spark-sql \ --conf spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog \ --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \ --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \ --conf spark.sql.catalog.clickhouse.database=${CLICKHOUSE_DATABASE} \ --conf spark.sql.catalog.clickhouse.option.vw=${CLICKHOUSE_VW} \ --conf spark.driver.bindAddress=127.0.0.1 \ --conf spark.bytehouse.write.batchSize=50000 \ --conf spark.bytehouse.write.format=jdbc \ --jars clickhouse-jdbc-0.4.6-all.jar,driver-java-1.1.75-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.11.jar Bash
参数 | 配置说明 |
---|---|
| 配置为实际使用的 jar 文件路径。 |
使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明。
import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("App") .master("local") .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog") .config("spark.sql.catalog.clickhouse.host", "<your-host>") .config("spark.sql.catalog.clickhouse.protocol", "http") .config("spark.sql.catalog.clickhouse.password", "<your-api-key>") .config("spark.sql.catalog.clickhouse.database", "<database>") .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>") .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.max_execution_time", "108000000") .config("spark.bytehouse.write.batchSize", "50000") .config("spark.bytehouse.write.format", "jdbc") .getOrCreate() val sql = "select * from clickhouse.xx.xx" val df = spark.sql(sql) spark.stop() } }
使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明。
from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("App")\ .master("local")\ .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog")\ .config("spark.sql.catalog.clickhouse.host", "<your-host>")\ .config("spark.sql.catalog.clickhouse.protocol", "http")\ .config("spark.sql.catalog.clickhouse.password", "<your-api-key>")\ .config("spark.sql.catalog.clickhouse.database", "<database>")\ .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>")\ .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.max_execution_time", "108000000")\ .config("spark.bytehouse.write.batchSize", "50000")\ .config("spark.bytehouse.write.format", "jdbc")\ .config("spark.jars", "clickhouse-jdbc-0.4.6-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.11.jar").\ getOrCreate() sql = "select count(*) from clickhouse.xxx.xxx" df = spark.sql(sql) df.show() spark.stop()
Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。Serverless Spark 作业开发指请参见 Serverless Spark 作业开发指南相关文档。
您可以通过 Serverless Spark 的方式连接 ByteHouse 和 火山引擎 EMR(E-MapReduce)服务,支持通过 Spark SQL、Spark Jar、PySpark 三种方式连接。
您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。Spark SQL 作业开发指南详情请参见 Spark SQL 作业开发指南。
set serverless.spark.analysis=true; set spark.hadoop.fs.tos.skip.resolve = true; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = <your-vpc-id>; -- 使用可用区 B 子网 set las.cross.vpc.subnet.id = <your-subnet-id>; set las.cross.vpc.security.group.id = <your-securitygroup-id>; set las.cross.vpc.accountId=<your-account-id>; set las.cluster.type = vke; set emr.serverless.spark.only.parse.enabled = true; set serverless.spark.access.key=<your-tos-ak>; set serverless.spark.secret.key=<your-tos-sk>; set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog; set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-{REGION}.bytehouse.ivolces.com; set spark.sql.catalog.clickhouse.protocol=http; set spark.sql.catalog.clickhouse.password=<your-api-key>; set spark.sql.catalog.clickhouse.database=<database>; set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>; set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-java-1.1.75-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.11.jar"}]; use clickhouse; -- 查询 hive -- select * from spark_catalog.hive_db.hive_table; -- select * from people_t_copy; insert into people_t_copy select * from people_t; SQL
参数 | 配置说明 |
---|---|
las.cross.vpc | 配置为 VPC 网络信息,您可以在 ByteHouse 控制台中租户管理 > 基本信息页面的网络信息模块,单击私网域名后的详情,查看使用的 VPC。 注意 配置子网时,需配置为可用区 B 的子网。 |
serverless.spark.access.key、serverless.spark.secret.key | 配置为AK/SK(access key、secret key),您可参见 获取Access Key。 |
spark.sql.catalog.clickhouse |
|
las.spark.jar.depend.jars | 配置为实际使用的 jar 的 tos 路径。 |
其他 | 更多连接参数的配置说明请参见下文的配置参数章节。 |
注意
set 参数预期格式为 ”SET“,”SET key“,或 “SET key=value” 。如果键(key)中需要包含特殊字符,或者值(value)中包含分号(;),可使用反引号(``)包裹。
您可以参考下面的命令,基于 Spark Jar 方式连接到 ByteHouse。Spark Jar 作业开发指南详情请参见 Spark Jar 作业开发指南。
使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明。
import org.apache.spark.sql.SparkSession object Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("App") .master("local") .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog") .config("spark.sql.catalog.clickhouse.host", "<your-host>") .config("spark.sql.catalog.clickhouse.protocol", "http") .config("spark.sql.catalog.clickhouse.password", "<your-api-key>") .config("spark.sql.catalog.clickhouse.database", "<database>") .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>") .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000") .config("spark.sql.catalog.clickhouse.option.max_execution_time", "108000000") .config("spark.bytehouse.write.batchSize", "50000") .config("spark.bytehouse.write.format", "jdbc") .getOrCreate() val sql = "select * from clickhouse.xx.xx" val df = spark.sql(sql) spark.stop() } }
您可以参考下面的命令,基于 PySpark 方式连接到 ByteHouse。PySpark 作业开发指南详情请参见 PySpark 作业开发指南。
使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明。
from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("App")\ .master("local")\ .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog")\ .config("spark.sql.catalog.clickhouse.host", "<your-host>")\ .config("spark.sql.catalog.clickhouse.protocol", "http")\ .config("spark.sql.catalog.clickhouse.password", "<your-api-key>")\ .config("spark.sql.catalog.clickhouse.database", "<database>")\ .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>")\ .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000")\ .config("spark.sql.catalog.clickhouse.option.max_execution_time", "108000000")\ .config("spark.bytehouse.write.batchSize", "50000")\ .config("spark.bytehouse.write.format", "jdbc")\ getOrCreate() sql = "select count(*) from clickhouse.xxx.xxx" df = spark.sql(sql) df.show() spark.stop()
参数 | 是否必选 | 默认值 | 数据类型 | 描述 | 起始版本 |
---|---|---|---|---|---|
spark.sql.catalog.${catalog}.host | ✅ | 无默认值 | string | 格式为 | 0.8.0.2 |
spark.sql.catalog.${catalog}.http_port | 否 | 19000 | int | 保持默认值即可。 | 0.8.0.2 |
spark.sql.catalog.${catalog}.protocol | 否 | http | string | 保持默认值即可。 | 0.8.0.2 |
spark.sql.catalog.${catalog}.user | 否 | bytehouse | string | 保持默认值即可。 | 0.8.0.2 |
spark.sql.catalog.${catalog}.password | ✅ | 无默认值 | string | 配置为 ByteHouse 的 API Key,您可以参考获取 API Key章节获取 API Key 信息。 | 0.8.0.2 |
spark.sql.catalog.${catalog}.database | 否 | default | string | 配置默认数据库名,配置后,后续使用 Spark SQL 时,查询此数据库中的表数据时可接使用表名来查询,无需使用* | 0.8.0.2 |
spark.sql.catalog.${catalog}.gateway_version | 否 | auto | string | v1, v2 或 auto。若网关为 auto,则自动选择 v1 或 v2。 | 0.8.0.10 |
spark.sql.catalog.${catalog}.timezone | 否 | server | string | 使用的时区,支持配置为:server, client, UTC+3, Asia/Shanghai 等。 | 0.8.0.2 |
spark.sql.catalog.${catalog}.dedup_key_mode | 否 | 无默认值 | string | 处理数据中的重复键值的策略,支持:
| 0.8.0.3 |
spark.sql.catalog.${catalog}.option | ✅ | 无默认值 | map | 其中 vw 参数是必选项,配置如下: | 0.8.0.10 |
spark.bytehouse.write.batchSize | 否 | 10000 | int | 0.8.0.2 | |
spark.bytehouse.write.maxRetry | 否 | 30 | int | 写入失败时,最大重试次数 | 0.8.0.3 |
spark.bytehouse.write.retryInterval | 否 | 10s | time | 写入失败重试时间间隔 | 0.8.0.3 |
spark.bytehouse.write.retryExponentialBackoffEnabled | 否 | true | boolean | 在写入操作期间,支持对重试间隔启用指数退避机制。 | 0.8.0.9 |
spark.bytehouse.write.sharding-strategy | 否 | 无默认值 | string | ClickHouse 分布式分片策略。支持值为:
| 0.8.0.9 |
spark.bytehouse.write.sharding-key | 否 | 无默认值 | string | 哈希分区键,可以由多个字段组成,用逗号分隔。 | 0.8.0.9 |
spark.bytehouse.write.sharding-expression | 否 | 无默认值 | string | 哈希分区的表达式。如果设置了哈希分区的表达式,则所有涉及的字段名也必须列在 | 0.8.0.9 |
spark.bytehouse.read.byPartition | 否 | 无默认值 | boolean | 如果为 true,将按分区拆分查询 | 0.8.0.9 |
spark.bytehouse.read.partition.lower | 否 | 无默认值 | string | 按分区扫描时的下限范围(包含下限值)。 | 0.8.0.9 |
spark.bytehouse.read.partition.upper | 否 | 无默认值 | string | 按分区扫描时的上限范围(包含上限值)。 | 0.8.0.9 |
spark.bytehouse.read.byBucket | 否 | 无默认值 | boolean | 如果为 true,将按 bucket 拆分查询 | 0.8.0.9 |
spark.bytehouse.read.maxRetry | 否 | 30 | int | 读取失败时最大重试次数 | 0.8.0.9 |
spark.bytehouse.read.retryInterval | 否 | 10s | time | 读取失败重试时间间隔 | 0.8.0.9 |
spark.bytehouse.read.retryExponentialBackoffEnabled | 否 | true | boolean | 在读操作期间为重试间隔启用指数退避,即配置为 true 后,读取操作遇到错误或失败时,系统将按照指数增长的时间间隔来进行重试。 | 0.8.0.9 |
您可通过以下参数设置超时时间:
set spark.sql.catalog.clickhouse.option.socket_timeout=60000; set spark.sql.catalog.clickhouse.option.query_timeout=60000; set spark.sql.catalog.clickhouse.option.connect_timeout=60000; set spark.sql.catalog.clickhouse.option.receive_timeout=60000; set spark.sql.catalog.clickhouse.option.send_timeout=60000; set spark.sql.catalog.clickhouse.option.max_execution_time=10800000;
如果您需要添加自定义参数,可按照 spark.sql.catalog.clickhouse.option.xxx
样式添加。
set spark.sql.catalog.clickhouse.option.xxx=xxx;
Spark 支持并行读 bucket 表和分区表,也支持同时开启 bucket 并行读和分区并行读,并行度为两者乘积。
如果读取的表为 bucket 表,则支持开启按 bucket 并行读取,并行度为 bucket 个数。本示例中的建表语句表示并行度为 32。
CREATE TABLE IF NOT EXISTS test.createTable( id UInt32, name String DEFAULT '' ) ENGINE=CnchMergeTree ORDER BY id CLUSTER BY EXPRESSION cityHash64(id) % 32 INTO 32 BUCKETS
# 开启按 bucket 并行读 set spark.bytehouse.read.byBucket=true;
如果读取的表为分区表,则支持开启按分区并行读取。
建表语句示例如下:
CREATE TABLE IF NOT EXISTS test.createTable( id UInt32, name String DEFAULT '', date Date ) ENGINE=CnchMergeTree ORDER BY id PARTITION BY date
假设需要查询 date 在 [2025-01-01, 2025-01-10] 时间范围内的数据,则可以按照如下配置,并行度为 10。
set spark.bytehouse.read.byPartition=true; set spark.bytehouse.read.partition.lower=20250101; set spark.bytehouse.read.partition.upper=20250110;
Spark 支持 bucket 表写入,以下示例展示了创建表 test.createTable
并通过配置 driver 参数开启写入数据功能。
CREATE TABLE IF NOT EXISTS test.createTable( id UInt32, name String DEFAULT '' ) ENGINE=CnchMergeTree ORDER BY id CLUSTER BY EXPRESSION cityHash64(id) % 32 INTO 32 BUCKETS
set spark.bytehouse.write.sharding-strategy=HASH; set spark.bytehouse.write.sharding-key=id; set spark.bytehouse.write.sharding-expression=`cityHash64(id) % 32`;
场景 | 是否支持 | Spark SQL 语法 | ByteHouse 云数仓版 SQL 语法 |
---|---|---|---|
listTables | ✅ | show tables | SHOW TABLES FROM |
loadTable | ✅ | show create table xx | SHOW TABLES FROM |
createTable | ❌ | CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid] | |
dropTable | ✅ | drop table | DROP TABLE |
renameTable | ✅ | rename table xx to xxx | RENAME TABLE |
listNamespaces | ✅ | show databases | SHOW DATABASES |
createNamespace | ✅ | create database xx | CREATE DATABASE |
dropNamespace | ✅ | drop database xx | DROP DATABASE |
数据类型 | 读取 | 写入 | |
---|---|---|---|
Integer types | UInt8 | ✅ | ✅ |
UInt16 | ✅ | ✅ | |
UInt32 | ✅ | ✅ | |
UInt64 | ✅ | ✅ | |
Int8 | ✅ | ✅ | |
Int16 | ✅ | ✅ | |
Int32 | ✅ | ✅ | |
Int64 | ✅ | ✅ | |
Floating-point numbers | Float32 | ✅ | ✅ |
Float64 | ✅ | ✅ | |
Decimal | ✅ | ✅ | |
Decimal32 | ✅ | ✅ | |
Decimal64 | ✅ | ✅ | |
Boolean | bool | ✅ | ✅ |
Strings | String | ✅ | ✅ |
FixedString | ✅ | ✅ | |
Dates | Date | ✅ | ✅ |
DateTime | ✅ | ✅ | |
DateTime64 | ✅ | ✅ | |
UUID | UUID | ✅ | ✅ |
Enum | Enum8 | ✅ | ✅ |
Enum16 | ✅ | ✅ | |
Arrays | Array(T) | ✅ | ✅ |
Maps | Map(K, V) | ✅ | ✅ |