You need to enable JavaScript to run this app.
ByteHouse云数仓版

ByteHouse云数仓版

复制全文
使用驱动程序
Spark Connector Driver
复制全文
Spark Connector Driver

ByteHouse 云数仓版 Spark Connector 连接器专门用于通过 Spark 将数据加载到 ByteHouse 云数仓版。本文将介绍通过 Spark SQL、EMR 支持的 Servless Spark 以及 Spark Jar 三种方式连接 ByteHouse 并处理数据。

限制条件
  • Java 8
  • Scala 2.12 及以上版本
  • Spark 3.5

驱动安装

请按照下面的方法,在程序中配置以下驱动的依赖项。

  1. ByteHouse Spark Connector。
  2. ClickHouse JDBC 驱动程序。
  3. ByteHouse JDBC 驱动程序。

Maven 依赖

对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。

  1. 添加如下依赖。

    Spark 版本

    依赖

    下载

    Spark 3.5

    <dependency>
        <groupId>com.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.4.6</version>
    </dependency>
    <dependency>
        <groupId>com.bytedance.bytehouse</groupId>
        <artifactId>driver-java</artifactId>
        <version>1.1.75</version>
        <classifier>all</classifier>
    </dependency>
    <dependency>
        <groupId>com.bytedance.bytehouse</groupId>
        <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId>
        <version>0.8.0.15</version>
    </dependency>
    
    clickhouse-jdbc-0.4.6.jar
    未知大小
    driver-java-1.1.75-all.jar
    未知大小
    clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar
    未知大小
  2. 将以下存储库添加到 pom.xml 文件:

    <repository>
        <id>bytedance</id>
        <name>ByteDance Public Repository</name>
        <url>https://artifact.bytedance.com/repository/releases</url>
    </repository>
    

使用说明

连接参数配置说明

连接至 ByteHouse 所需的通用参数及配置说明如下。更多连接参数的配置说明请参见下文的配置参数章节。

参数

配置说明

CLICKHOUSE_HOST、spark.sql.catalog.clickhouse.host

配置为 ByteHouse 的公网连接域名。您可以在 ByteHouse 控制台的 租户管理 > 基本信息>网络信息 中查看对应信息。详情请参见 步骤二:配置网络信息

CLICKHOUSE_PASSWORD、spark.sql.catalog.clickhouse.password

API key的值,您可以在 ByteHouse 控制台的 租户管理>连接信息 中获取 API Key。详情请参见获取 API Key

CLICKHOUSE_DATABASE、spark.sql.catalog.clickhouse.database

配置为需连接的 ByteHouse 的数据库名。您可通过 ByteHouse 控制台 > 数据库,查看并复制需连接的数据库名称。

CLICKHOUSE_VW、spark.sql.catalog.clickhouse.option.vw

配置为 ByteHouse 计算组名,您可通过 ByteHouse 控制台 > 租户管理 > 参数设置,查看默认计算组。

本地测试

在本地测试前,请将以下 jar 包加载到 ${SPARK_HOME/jars},或者在提交命令时使用 --jars 指定 Jar 的本地路径。

  1. ByteHouse Spark Connector。
  2. ClickHouse JDBC 驱动程序。
  3. ByteHouse JDBC 驱动程序。

Spark SQL

使用时请将 CLICKHOUSE_HOST、CLICKHOUSE_PASSWORD、CLICKHOUSE_DATABASE、CLICKHOUSE_VW 替换为实际值,获取方式请参见连接参数配置说明

export SPARK_LOCAL_IP=localhost
export SPARK_HOME=/opt/tiger/spark
export CLICKHOUSE_HOST=bytehouse-<REGION>.volces.com
export CLICKHOUSE_PASSWORD=<your-api-key>
export CLICKHOUSE_DATABASE=<database>
export CLICKHOUSE_VW=<your-virtual-warehouse>

$SPARK_HOME/bin/spark-sql \
  --conf spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog \
  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
  --conf spark.sql.catalog.clickhouse.database=${CLICKHOUSE_DATABASE} \
  --conf spark.sql.catalog.clickhouse.option.vw=${CLICKHOUSE_VW} \
  --conf spark.driver.bindAddress=127.0.0.1 \
  --conf spark.bytehouse.write.batchSize=50000 \
  --conf spark.bytehouse.write.format=jdbc \
  --jars clickhouse-jdbc-0.4.6-all.jar,driver-java-1.1.75-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar

参数

配置说明

--jars

配置为实际使用的 jar 文件路径。

Spark Jar

使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明

import org.apache.spark.sql.SparkSession

object Main {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("App")
      .master("local")
      .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "http")
      .config("spark.sql.catalog.clickhouse.password", "<your-api-key>")
      .config("spark.sql.catalog.clickhouse.database", "<database>")
      .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>")
      .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.max_execution_time", "80000")
      .config("spark.bytehouse.write.batchSize", "50000")
      .config("spark.bytehouse.write.format", "jdbc")
      .getOrCreate()

    val sql = "select * from clickhouse.xx.xx"
    val df = spark.sql(sql)
    spark.stop()
  }
}

PySpark

使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明

from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .appName("App")\
    .master("local")\
    .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog")\
    .config("spark.sql.catalog.clickhouse.host", "<your-host>")\
    .config("spark.sql.catalog.clickhouse.protocol", "http")\
    .config("spark.sql.catalog.clickhouse.password", "<your-api-key>")\
    .config("spark.sql.catalog.clickhouse.database", "<database>")\
    .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>")\
    .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.max_execution_time", "80000")\
    .config("spark.bytehouse.write.batchSize", "50000")\
    .config("spark.bytehouse.write.format", "jdbc")\
    .config("spark.jars", "clickhouse-jdbc-0.4.6-all.jar,driver-java-1.1.75-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar").\
    getOrCreate()
sql = "select count(*) from clickhouse.xxx.xxx"
df = spark.sql(sql)
df.show()
spark.stop()

Serverless Spark

Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。Serverless Spark 作业开发指请参见 Serverless Spark 作业开发指南相关文档。
您可以通过 Serverless Spark 的方式连接 ByteHouse 和 火山引擎 EMR(E-MapReduce)服务,支持通过 Spark SQL、Spark Jar、PySpark 三种方式连接。

Spark SQL

您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。Spark SQL 作业开发指南详情请参见 Spark SQL 作业开发指南

使用默认计算组连接

以下命令使用 Spark SQL 默认计算组连接至 ByteHouse,支持通过任务级别和队列级别指定。
使用时请将命令中的占位符替换为实际参数值,获取方式请参见下文参数说明。

  • 任务级别指定
    支持通过私网或公网连接。

    • 私网连接

      set serverless.cross.vpc.access.enabled = true;
      set serverless.cross.vpc.vpc.id = <your-vpc-id>;
      set serverless.cross.vpc.subnet.id = <your-subnet-id>;
      set serverless.cross.vpc.security.group.id = <your-securitygroup-id>;
      set emr.serverless.spark.only.parse.enabled  = true;
      set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
      set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-<region>.bytehouse.ivolces.com;
      set spark.sql.catalog.clickhouse.protocol=http;
      set spark.sql.catalog.clickhouse.password=<your-api-key>;
      set spark.sql.catalog.clickhouse.database=<database>;
      set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
      
      set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-java-1.1.75-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar"}];
      use clickhouse;
      
      -- 查询 hive
      -- select * from spark_catalog.hive_db.hive_table;
      -- select * from people_t_copy;
      insert into people_t_copy select * from people_t; 
      
    • 公网连接

      set serverless.cross.vpc.access.enabled = true;
      set serverless.cross.vpc.vpc.id = <your-vpc-id>;
      set serverless.cross.vpc.subnet.id = <your-subnet-id>;
      set serverless.cross.vpc.security.group.id = <your-securitygroup-id>;
      set emr.serverless.spark.only.parse.enabled  = true;
      set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
      set spark.sql.catalog.clickhouse.host=bytehouse-<region>.volces.com;
      set spark.sql.catalog.clickhouse.protocol=http;
      set spark.sql.catalog.clickhouse.password=<your-api-key>;
      set spark.sql.catalog.clickhouse.database=<database>;
      set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
      
      set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-java-1.1.75-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar"}];
      use clickhouse;
      
      -- 查询 hive
      -- select * from spark_catalog.hive_db.hive_table;
      -- select * from people_t_copy;
      insert into people_t_copy select * from people_t; 
      

    参数说明

    参数

    配置说明

    serverless.cross.vpc.

    配置为 VPC 网络信息,您可以在 ByteHouse 控制台中租户管理 > 基本信息页面的网络信息模块,单击私网域名后的详情,查看使用的 VPC 及相关参数信息。

    • serverless.cross.vpc.vpc.id:配置为 VPC ID。
    • serverless.cross.vpc.subnet.id:配置为子网 ID。
    • serverless.cross.vpc.security.group.id:配置为安全组 ID。

    spark.sql.catalog.clickhouse

    • host:配置 ByteHouse 的私网或公网 HOST 地址,您可以参见步骤二:配置网络信息来获取。
    • password:配置 ByteHouse 的API key,您可以参见获取 API Key来获取。
    • database:配置 ByteHouse 的数据库名。您可通过 ByteHouse 控制台 > 数据库,查看并复制需连接的数据库名称。
    • vw:配置为 ByteHouse 的计算组名。 您可通过 ByteHouse 控制台 > 租户管理 > 参数设置,查看默认计算组。

    las.spark.jar.depend.jars

    配置为实际使用的 jar 的 TOS 路径。

    其他

    更多连接参数的配置说明请参见下文的配置参数章节。

    注意

    set 参数预期格式为 ”SET“,”SET key“,或 “SET key=value” 。如果键(key)中需要包含特殊字符,或者值(value)中包含分号(;),可使用反引号(````)包裹。

  • 队列级别指定
    在指定计算组前,您需要在 EMR Serverless 控制台,单击队列名称,在队列详情页面的网络连接模块,单击网络设置,为队列配置网络连接,确保 EMR 与 ByteHouse 在同一个 VPC 内。如果开启了全局跨 VPC 路由,则可以关闭。
    Image
    需配置的任务参数如下,支持通过私网或公网连接。

    • 私网连接

      set serverless.cross.vpc.access.enabled = true;
      set emr.serverless.spark.only.parse.enabled  = true;
      set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
      set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-<region>.bytehouse.ivolces.com;
      set spark.sql.catalog.clickhouse.protocol=http;
      set spark.sql.catalog.clickhouse.password=<your-api-key>;
      set spark.sql.catalog.clickhouse.database=<database>;
      set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
      
      set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-java-1.1.75-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar"}];
      use clickhouse;
      
      -- 查询 hive
      -- select * from spark_catalog.hive_db.hive_table;
      -- select * from people_t_copy;
      insert into people_t_copy select * from people_t; 
      
    • 公网连接

      set serverless.cross.vpc.access.enabled = true;
      set emr.serverless.spark.only.parse.enabled  = true;
      set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
      set spark.sql.catalog.clickhouse.host=bytehouse-<region>.volces.com;
      set spark.sql.catalog.clickhouse.protocol=http;
      set spark.sql.catalog.clickhouse.password=<your-api-key>;
      set spark.sql.catalog.clickhouse.database=<database>;
      set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
      
      set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-java-1.1.75-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar"}];
      use clickhouse;
      
      -- 查询 hive
      -- select * from spark_catalog.hive_db.hive_table;
      -- select * from people_t_copy;
      insert into people_t_copy select * from people_t; 
      

    参数说明

    参数

    配置说明

    spark.sql.catalog.clickhouse

    • host:配置 ByteHouse 的私网或公网 HOST 地址,您可以参见步骤二:配置网络信息来获取。
    • password:配置 ByteHouse 的API key,您可以参见获取 API Key来获取。
    • database:配置 ByteHouse 的数据库名。您可通过 ByteHouse 控制台 > 数据库,查看并复制需连接的数据库名称。
    • vw:配置为 ByteHouse 的计算组名。 您可通过 ByteHouse 控制台 > 租户管理 > 参数设置,查看默认计算组。

    las.spark.jar.depend.jars

    配置为实际使用的 jar 的 TOS 路径。

    其他

    更多连接参数的配置说明请参见下文的配置参数章节。

    注意

    set 参数预期格式为 ”SET“,”SET key“,或 “SET key=value” 。如果键(key)中需要包含特殊字符,或者值(value)中包含分号(;),可使用反引号(````)包裹。

使用 Spark SQL 计算组连接

如果您创建了 Spark SQL 计算组,且想要使用该计算组连接至 ByteHouse ,可使用以下方式。Spark SQL 计算组的介绍请参见队列管理

  1. 登录火山引擎 EMR 控制台,单击队列名称,在队列详情页面的网络连接模块,单击网络设置,为队列配置网络连接,确保 EMR 与 ByteHouse 在同一个 VPC 内。如果开启了全局跨 VPC 路由,则可以关闭。
    Image

  2. 资源管理页签下单击 Serverless,单击您使用的队列,单击计算组,单击 Spark SQL 计算组,单击参数配置,配置以下 spark.jars 自定义参数。
    Image

    {
      "spark.jars": "tos://bytehouse-spark/cdw/driver-java-1.1.75-all.jar,tos://bytehouse-spark/cdw/clickhouse-jdbc-0.4.6-all.jar,tos://bytehouse-spark/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.15.jar"
    }
    
  3. 配置完成后,可使用以下命令连接至 ByteHouse。使用时请将命令中的占位符替换为实际参数值,获取方式请参见下文参数说明,支持通过私网或公网连接。

    • 私网连接

      set emr.serverless.spark.only.parse.enabled  = true;
      set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
      set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-<region>.bytehouse.ivolces.com;
      set spark.sql.catalog.clickhouse.protocol=http;
      set spark.sql.catalog.clickhouse.password=<your-api-key>;
      set spark.sql.catalog.clickhouse.database=<database>;
      set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
      
      use clickhouse;
      
      -- 查询 hive
      -- select * from spark_catalog.hive_db.hive_table;
      -- select * from people_t_copy;
      insert into people_t_copy select * from people_t; 
      
    • 公网连接

      set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
      set spark.sql.catalog.clickhouse.host=bytehouse-<region>.volces.com;
      set spark.sql.catalog.clickhouse.protocol=http;
      set spark.sql.catalog.clickhouse.password=<your-api-key>;
      set spark.sql.catalog.clickhouse.database=<database>;
      set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
      
      use clickhouse;
      
      -- 查询 hive
      -- select * from spark_catalog.hive_db.hive_table;
      -- select * from people_t_copy;
      insert into people_t_copy select * from people_t; 
      

    参数说明

    参数

    配置说明

    spark.sql.catalog.clickhouse

    • host:配置 ByteHouse 的私网或公网 HOST 地址,您可以参见步骤二:配置网络信息来获取。
    • password:配置 ByteHouse 的API key,您可以参见获取 API Key来获取。
    • database:配置 ByteHouse 的数据库名。您可通过 ByteHouse 控制台 > 数据库,查看并复制需连接的数据库名称。
    • vw:配置为 ByteHouse 的计算组名。 您可通过 ByteHouse 控制台 > 租户管理 > 参数设置,查看默认计算组。

    其他

    更多连接参数的配置说明请参见下文的配置参数章节。

    注意

    set 参数预期格式为 ”SET“,”SET key“,或 “SET key=value” 。如果键(key)中需要包含特殊字符,或者值(value)中包含分号(;),可使用反引号(````)包裹。

Spark Jar

您可以参考下面的命令,基于 Spark Jar 方式连接到 ByteHouse。Spark Jar 作业开发指南详情请参见 Spark Jar 作业开发指南
提交时,需要在提交参数里指定 VPC 相关参数,VPC 需与 ByteHous所在 VPC 保持一致,详情请参考 EMR Serverless 访问 VPC 实践指南
使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明

import org.apache.spark.sql.SparkSession

object Main {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("App")
      .master("local")
      .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "http")
      .config("spark.sql.catalog.clickhouse.password", "<your-api-key>")
      .config("spark.sql.catalog.clickhouse.database", "<database>")
      .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>")
      .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000")
      .config("spark.sql.catalog.clickhouse.option.max_execution_time", "80000")
      .config("spark.bytehouse.write.batchSize", "50000")
      .config("spark.bytehouse.write.format", "jdbc")
      .getOrCreate()

    val sql = "select * from clickhouse.xx.xx"
    val df = spark.sql(sql)
    spark.stop()
  }
}

PySpark

您可以参考下面的命令,基于 PySpark 方式连接到 ByteHouse。PySpark 作业开发指南详情请参见 PySpark 作业开发指南
提交时,需要在提交参数里指定 VPC 相关参数,VPC 需与 ByteHous所在 VPC 保持一致,详情请参考 EMR Serverless 访问 VPC 实践指南
使用时请将 spark.sql.catalog.clickhouse.host、spark.sql.catalog.clickhouse.password、spark.sql.catalog.clickhouse.database、spark.sql.catalog.clickhouse.option.vw 替换为实际值,获取方式请参见连接参数配置说明

from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .appName("App")\
    .master("local")\
    .config("spark.sql.catalog.clickhouse", "com.bytehouse.ByteHouseCatalog")\
    .config("spark.sql.catalog.clickhouse.host", "<your-host>")\
    .config("spark.sql.catalog.clickhouse.protocol", "http")\
    .config("spark.sql.catalog.clickhouse.password", "<your-api-key>")\
    .config("spark.sql.catalog.clickhouse.database", "<database>")\
    .config("spark.sql.catalog.clickhouse.option.vw", "<your-virtual-warehouse>")\
    .config("spark.sql.catalog.clickhouse.option.query_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.connect_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.receive_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.send_timeout", "600000")\
    .config("spark.sql.catalog.clickhouse.option.max_execution_time", "80000")\
    .config("spark.bytehouse.write.batchSize", "50000")\
    .config("spark.bytehouse.write.format", "jdbc")\
    getOrCreate()
sql = "select count(*) from clickhouse.xxx.xxx"
df = spark.sql(sql)
df.show()
spark.stop()

配置参数

参数

是否必选

默认值

数据类型

描述

起始版本

spark.sql.catalog.${catalog}.host

无默认值

string

格式为 bytehouse-{REGION}.volces.com,更多可用区信息请参阅 支持的地域及可用区

0.8.0.2

spark.sql.catalog.${catalog}.http_port

19000

int

保持默认值即可。

0.8.0.2

spark.sql.catalog.${catalog}.protocol

http

string

保持默认值即可。

0.8.0.2

spark.sql.catalog.${catalog}.user

bytehouse

string

保持默认值即可。

0.8.0.2

spark.sql.catalog.${catalog}.password

无默认值

string

配置为 ByteHouse 的 API Key,您可以参考获取 API Key章节获取 API Key 信息。

0.8.0.2

spark.sql.catalog.${catalog}.database

default

string

配置默认数据库名,配置后,后续使用 Spark SQL 时,查询此数据库中的表数据时可接使用表名来查询,无需使用*数据库名.表名​*的格式。

0.8.0.2

spark.sql.catalog.${catalog}.gateway_version

auto

string

v1, v2 或 auto。若网关为 auto,则自动选择 v1 或 v2。

0.8.0.10

spark.sql.catalog.${catalog}.timezone

server

string

使用的时区,支持配置为:server, client, UTC+3, Asia/Shanghai 等。

0.8.0.2

spark.sql.catalog.${catalog}.dedup_key_mode

无默认值

string

处理数据中的重复键值的策略,支持:

  • throw
  • append
  • replace
  • ignore

0.8.0.3

spark.sql.catalog.${catalog}.option

无默认值

map

其中 vw 参数是必选项,配置如下:
spark.sql.catalog.${catalog}.option.vw=xx

0.8.0.10

spark.bytehouse.write.batchSize

10000

int

无。

0.8.0.2

spark.bytehouse.write.maxRetry

30

int

写入失败时,最大重试次数

0.8.0.3

spark.bytehouse.write.retryInterval

10s

time

写入失败重试时间间隔

0.8.0.3

spark.bytehouse.write.retryExponentialBackoffEnabled

true

boolean

在写入操作期间,支持对重试间隔启用指数退避机制。

0.8.0.9

spark.bytehouse.write.sharding-strategy

无默认值

string

ClickHouse 分布式分片策略。支持值为:

  • NONE:不要分区写入。
  • HASH:shard_key 的哈希写入。

0.8.0.9

spark.bytehouse.write.sharding-key

无默认值

string

哈希分区键,可以由多个字段组成,用逗号分隔。

0.8.0.9

spark.bytehouse.write.sharding-expression

无默认值

string

哈希分区的表达式。如果设置了哈希分区的表达式,则所有涉及的字段名也必须列在 sharding-key

0.8.0.9

spark.bytehouse.read.byPartition

无默认值

boolean

如果为 true,将按分区拆分查询

0.8.0.9

spark.bytehouse.read.byPartition.strategy

manual

string

查询分区策略。支持的值为:

  • manual,需要同时指定 lower,upper,partition count 参数。
  • auto,将根据过滤条件和表分区键自动划分分区。开启自动划分分区需要满足以下条件:
    • 表分区只且只有一个列
    • 过滤条件需要在表的分区列上,且该条件支持下推
    • 支持的过滤条件:>, >=,=,<,<=, in, between and
    • 当 in 和其他条件同时出现时,只会根据 in 划分

0.8.0.15

spark.bytehouse.read.byPartition.count

100

int

分区数。

0.8.0.15

spark.bytehouse.read.partition.lower

无默认值

string

按分区扫描时的下限范围(包含下限值)。

0.8.0.9

spark.bytehouse.read.partition.upper

无默认值

string

按分区扫描时的上限范围(包含上限值)。

0.8.0.9

spark.bytehouse.read.partition.stringAsDate

false

boolean

支持分区键的类型为 String,值格式为 yyyy-MM-dd 的自动分区。

0.8.0.15

spark.bytehouse.read.partition.stringAsInt

true

boolean

支持分区键的类型为 String,值格式为 yyyyMMdd 的自动分区。

0.8.0.15

spark.bytehouse.read.partition.detectMinMax

true

boolean

分区策略为自动时,查询条件里如果只有分区的上限或下限,支持查询分区列的最大最小值。

0.8.0.15

spark.bytehouse.read.byBucket

无默认值

boolean

如果为 true,将按 bucket 拆分查询。

0.8.0.9

spark.bytehouse.read.maxRetry

30

int

读取失败时最大重试次数。

0.8.0.9

spark.bytehouse.read.retryInterval

10s

time

读取失败重试时间间隔。

0.8.0.9

spark.bytehouse.read.retryExponentialBackoffEnabled

true

boolean

在读操作期间为重试间隔启用指数退避,即配置为 true 后,读取操作遇到错误或失败时,系统将按照指数增长的时间间隔来进行重试。

0.8.0.9

使用示例

设置 driver 参数

设置超时参数

您可通过以下参数设置超时时间:

set spark.sql.catalog.clickhouse.option.socket_timeout=60000;
set spark.sql.catalog.clickhouse.option.query_timeout=60000;
set spark.sql.catalog.clickhouse.option.connect_timeout=60000;
set spark.sql.catalog.clickhouse.option.receive_timeout=60000;
set spark.sql.catalog.clickhouse.option.send_timeout=60000;
set spark.sql.catalog.clickhouse.option.max_execution_time=80000;

其他 driver 自定义参数

如果您需要添加自定义参数,可按照 spark.sql.catalog.clickhouse.option.xxx 样式添加。

set spark.sql.catalog.clickhouse.option.xxx=xxx;

并行读

Spark 支持并行读 bucket 表和分区表,也支持同时开启 bucket 并行读和分区并行读,并行度为两者乘积。

Bucket 表

如果读取的表为 bucket 表,则支持开启按 bucket 并行读取,并行度为 bucket 个数。本示例中的建表语句表示并行度为 32。

CREATE TABLE IF NOT EXISTS test.createTable(
id UInt32,
name String DEFAULT ''
) 
ENGINE=CnchMergeTree
ORDER BY id
CLUSTER BY EXPRESSION cityHash64(id) % 32 INTO 32 BUCKETS
# 开启按 bucket 并行读
set spark.bytehouse.read.byBucket=true;

分区表

如果读取的表为分区表,则支持开启按分区并行读取。
建表语句示例如下:

CREATE TABLE IF NOT EXISTS test.createTable(
id UInt32,
name String DEFAULT '',
date Date
) 
ENGINE=CnchMergeTree
ORDER BY id
PARTITION BY date

假设需要查询 date 在 [2025-01-01, 2025-01-10] 时间范围内的数据,则可以按照如下配置,并行度为 10。

set spark.bytehouse.read.byPartition=true;
set spark.bytehouse.read.partition.lower=20250101;
set spark.bytehouse.read.partition.upper=20250110;

Bucket 写

Spark 支持 bucket 表写入,以下示例展示了创建表 test.createTable 并通过配置 driver 参数开启写入数据功能。

CREATE TABLE IF NOT EXISTS test.createTable(
id UInt32,
name String DEFAULT ''
) 
ENGINE=CnchMergeTree
ORDER BY id
CLUSTER BY EXPRESSION cityHash64(id) % 32 INTO 32 BUCKETS
set spark.bytehouse.write.sharding-strategy=HASH;
set spark.bytehouse.write.sharding-key=id;
set spark.bytehouse.write.sharding-expression=`cityHash64(id) % 32`;

常用的 SQL

本节介绍了常用的 SQL 语句,用于写入数据和删除数据。

写入数据

写入临时数据

  • 使用 values,并指定列(column)。

    INSERT INTO clickhouse.db.table(id, age) values(1, 10)
    
  • 使用 SELECT 语句。

    INSERT INTO clickhouse.db.table SELECT 1 AS id, 10 AS age
    

写入另一张表的数据

  • Hive 表
    您可按需指定或不指定 spark_catalog。

    • 不指定 spark_catalog:

      INSERT INTO clickhouse.db.table SELECT id, age FROM hive_db.hive_table
      
    • 指定 spark_catalog:

      INSERT INTO clickhouse.db.table SELECT id, age FROM spark_catalog.hive_db.hive_table
      
  • ByteHouse 表

    INSERT INTO clickhouse.db.table SELECT id, age FROM clickhouse.db1.table1
    

注意

ByteHouse 暂不支持 insert overwrite [ partition_spec ] 语法,请勿使用该方式写入数据。

删除数据

使用 WHERE 条件删除数据。

DELETE FROM clickhouse.db.table WHERE id = 1

参考:支持场景

场景

是否支持

Spark SQL 语法

ByteHouse 云数仓版 SQL 语法

listTables

show tables

SHOW TABLES FROM `xx`

loadTable

show create table xx

SHOW TABLES FROM `xx` LIKE `xxx`

createTable

CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid]
(
    [tableColumnDfnt],
    [CONSTRAINT constraint_name CHECK columnExpr,]
    ...
) [engineClause]

dropTable

drop table xx.xxx

DROP TABLE `xx`.`xxx`

renameTable

rename table xx to xxx

RENAME TABLE `xx`.`xxx` to `xx`.`xxx`

listNamespaces

show databases

SHOW DATABASES

createNamespace

create database xx

CREATE DATABASE `xx` [with properties ('engine'='cnch')]

dropNamespace

drop database xx

DROP DATABASE `xx`

参考:支持的数据类型

数据类型

读取

写入

Integer types

UInt8

UInt16

UInt32

UInt64

Int8

Int16

Int32

Int64

Floating-point numbers

Float32

Float64

Decimal

Decimal32

Decimal64

Boolean

bool

Strings

String

FixedString

Dates

Date

DateTime

DateTime64

UUID

UUID

Enum

Enum8

Enum16

Arrays

Array(T)

Maps

Map(K, V)

最近更新时间:2025.11.24 10:59:18
这个页面对您有帮助吗?
有用
有用
无用
无用