ByteHouse 企业版 Spark Connector 连接器专门用于通过 Spark 将数据加载到 ByteHouse 企业版。
本文将介绍通过 Spark SQL ,以及 EMR 支持的 Servless Spark 两种方式连接ByteHouse并处理数据。
请按照下面的方法,在程序中配置以下驱动的依赖项。
对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。
<dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.4.6</version> </dependency> <dependency> <groupId>com.bytedance.bytehouse-ce</groupId> <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId> <version>0.8.1.1</version> </dependency>
然后,将以下存储库添加到 pom.xml
文件:
<repository> <id>bytedance</id> <name>ByteDance Public Repository</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository>
您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。
请注意替换:
export
字段中的 CLICKHOUSE_HOST
和 CLICKHOUSE_PASSWORD
的值,您可以参见 获取 ByteHouse 连接信息来获取;export
字段中的数据库名CLICKHOUSE_DATABASE
和计算组名CLICKHOUSE_VW
。--jars
配置中实际使用的 jar 实际文件路径。export SPARK_LOCAL_IP=localhost export SPARK_HOME=/opt/tiger/spark export CLICKHOUSE_HOST=<your-host> export CLICKHOUSE_HTTP_PORT=8123 export CLICKHOUSE_USER=<your-username> export CLICKHOUSE_PASSWORD='<your-password>' $SPARK_HOME/bin/spark-sql \ --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \ --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \ --conf spark.sql.catalog.clickhouse.protocol=http \ --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \ --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \ --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \ --conf spark.sql.catalog.clickhouse.database=default \ --conf spark.sql.catalog.clickhouse.shard-discovery.kind=CE_API_CLUSTERS \ --conf spark.sql.catalog.clickhouse.bytehouse-ce.api.account-id=xx \ --conf spark.sql.catalog.clickhouse.clickhouse.cluster=xx \ --conf spark.driver.bindAddress=127.0.0.1 \ --conf spark.clickhouse.write.distributed.convertLocal=true \ --conf spark.clickhouse.write.useShardsWriter=true \ --conf spark.clickhouse.write.format=json \ --conf spark.clickhouse.read.filterByPartition=false \ --jars clickhouse-jdbc-0.4.6-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar
Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。通过此方式连接,您需要将driver下载并上传到 TOS 中调用。
您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。
请注意替换:
set
中的 vpc 网络信息。您可以在 ByteHouse控制台 集群管理-集群详情-基本信息 页面查询到此信息。set
中的 AK/SK(access key、secret key)信息,您可参见 获取Access Key。set
字段中的公网或私网 HOST
地址,以及账户密码信息,您可以参见 获取集群连接信息;同时设置 database 数据库名和VW 计算组名。set serverless.spark.analysis=true; set spark.hadoop.fs.tos.skip.resolve = true; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = <your-vpc-id>; -- 使用可用区B子网 set las.cross.vpc.subnet.id = <your-subnet-id>; set las.cross.vpc.security.group.id = <your-securitygroup-id>; set las.cross.vpc.accountId=<your-account-id>; set las.cluster.type = vke; set emr.serverless.spark.only.parse.enabled = true; set serverless.spark.access.key=<your-tos-ak>; set serverless.spark.secret.key=<your-tos-sk>; set spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog; set spark.sql.catalog.clickhouse.host=<host>; set spark.sql.catalog.clickhouse.protocol=http; set spark.sql.catalog.clickhouse.http_port=8123; set spark.sql.catalog.clickhouse.user=<your-username>; set spark.sql.catalog.clickhouse.password=<your-password>; set spark.sql.catalog.clickhouse.database=<your-database>; set spark.sql.catalog.clickhouse.shard_discovery.kind=CE_API_CLUSTERS; set spark.sql.catalog.clickhouse.bytehouse_ce.api.account_id=<your-account-id>; set spark.sql.catalog.clickhouse.cluster=<your-cluster-name>; set spark.sql.catalog.clickhouse.option.socket_timeout=300000; set spark.sql.catalog.clickhouse.option.connect_timeout=30000; set spark.sql.catalog.clickhouse.option.custom_settings=max_execution_time=3000; set spark.clickhouse.write.format=json; set spark.clickhouse.read.filterByPartition=false; set spark.clickhouse.write.distributed.convertLocal=true; set spark.clickhouse.write.useShardsWriter=true; set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar"}]; use clickhouse; insert into people_t_copy select * from people_t;
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
spark.sql.catalog.${catalog} | ✅ | string | ||
spark.sql.catalog.${catalog}.host | ✅ | string | 例如 xx.bytehouse-ce.ivolces.com, | |
spark.sql.catalog.${catalog}.http_port | 8123 | int | ||
spark.sql.catalog.${catalog}.protocol | http | string | ||
spark.sql.catalog.${catalog}.user | ✅ | string | ||
spark.sql.catalog.${catalog}.password | ✅ | string | ||
spark.sql.catalog.${catalog}.database | default | string | ||
spark.sql.catalog.${catalog}.timezone | server | string | server, client, UTC+3, Asia/Shanghai, etc. | |
spark.sql.catalog.${catalog}.shard_discovery.kind | ✅ | string | CE_API_CLUSTERS | |
spark.sql.catalog.${catalog}.shard_discovery.host | string | 默认为 spark.sql.catalog.${catalog}.host | ||
spark.sql.catalog.${catalog}.shard_discovery.port | 80 | int | ||
spark.sql.catalog.${catalog}.bytehouse_ce.api.account_id | ✅ | |||
spark.sql.catalog.${catalog}.cluster | ✅ | string | 集群名称 | |
spark.sql.catalog.${catalog}.option | ✅ | map | ||
spark.bytehouse.write.batchSize | 10000 | int | ||
spark.clickhouse.write.repartitionByPartition | true | bool | 写入前是否通过ClickHouse分区键来重新划分数据以满足ClickHouse分布 | |
spark.clickhouse.write.distributed.useClusterNodes | true | bool | 写分布式表时写入集群所有节点 | |
spark.clickhouse.write.distributed.convertLocal | false | bool | 当写分布式表时,改为写本地表 | |
spark.clickhouse.read.distributed.convertLocal | true | bool | 当读取分布式表时,改为读取本地表 | |
spark.clickhouse.read.filterByPartition | true | bool | 读取表时,按_partition_id或分区值过滤 | |
spark.clickhouse.write.localSortByPartition | true | bool | 如果为true,写入前按分区做本地排序 | |
spark.clickhouse.write.localSortByKey | true | bool | 如果为true,写入前通过排序键做本地排序 | |
spark.clickhouse.write.format | json | string | json格式 | |
spark.clickhouse.write.useShardsWriter | false | string | 如果为true,则在写入分片之前缓存分片中的行 | |
spark.clickhouse.write.shardingStrategy | AUTO | string | 分片策略,支持HASH、AUTO | |
spark.clickhouse.write.shardingExpression | string | 分片表达式,支持cityHash64(xx)、intHash64(xx) 和 sipHash64(xx) | ||
spark.clickhouse.read.format | json | string | json格式 |
场景 | 是否支持 | Spark SQL 语法 | ByteHouse 企业版 SQL 语法 |
---|---|---|---|
listTables | ✅ | show tables | SHOW TABLES |
loadTable | ✅ | show create table xx | SHOW CREATE TABLE |
createTable | ❌ | CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid] | |
dropTable | ✅ | drop table | DROP TABLE |
renameTable | ✅ | rename table xx to xxx | RENAME TABLE |
listNamespaces | ✅ | show databases | SHOW DATABASES |
createNamespace | ✅ | create database xx | CREATE DATABASE |
dropNamespace | ✅ | drop database xx | DROP DATABASE |
数据类型 | 读取 | 写入 | |
---|---|---|---|
Integer types | UInt8 | ✅ | ✅ |
UInt16 | ✅ | ✅ | |
UInt32 | ✅ | ✅ | |
UInt64 | ✅ | ✅ | |
Int8 | ✅ | ✅ | |
Int16 | ✅ | ✅ | |
Int32 | ✅ | ✅ | |
Int64 | ✅ | ✅ | |
Floating-point numbers | Float32 | ✅ | ✅ |
Float64 | ✅ | ✅ | |
Decimal | ✅ | ✅ | |
Boolean | bool | ✅ | ✅ |
Strings | String | ✅ | ✅ |
FixedString | ✅ | ✅ | |
Dates | Date | ✅ | ✅ |
DateTime | ✅ | ✅ | |
UUID | UUID | ✅ | ✅ |
Enum | Enum8 | ✅ | ✅ |
Enum16 | ✅ | ✅ | |
Arrays | Array(T) | ✅ | ✅ |
Maps | Map(K, V) | ✅ | ✅ |