You need to enable JavaScript to run this app.
导航
ClickHouse Rust (HTTP) Driver
最近更新时间:2025.10.22 19:18:53首次发布时间:2025.03.12 14:06:07
复制全文
我的收藏
有用
有用
无用
无用

本文介绍如何使用 ClickHouse Rust (HTTP) 驱动连接并访问 ByteHouse 云数仓。

环境要求

建议使用 Rustc1.84.1 或更高版本。

推荐版本

细分项

已验证版本/注意事项

ClickHouse 驱动程序版本

已验证版本:0.13.1,版本详情请参见:clickhouse v0.13.1

使用限制
  • 当前暂不支持 BitMap64 和 JSONB 数据类型。
  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

安装驱动

获取 ClickHouse 驱动后,可以通过cargo进行安装:

# Install ClickHouse with native-tls feature
cargo add clickhouse@0.13.1 --features native-tls
# Install tokio with full features (required for async operations)
cargo add tokio@1.0.1 --features full
# Install anyhow for error handling (optional but recommended)
cargo add anyhow
# Install serde with derive features (required for Row trait)
cargo add serde --features derive

或者将以下内容添加到Cargo.toml

[dependencies]
anyhow = "1.0.95"
clickhouse = { version = "0.13.1", features = ["native-tls"] }
serde = { version = "1.0.217", features = ["derive"] }
tokio = { version = "1.0.1", features = ["full"] }

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse Rust (HTTP) Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

参数说明

url

连接 ByteHouse 时,URL 需为 HTTPS,配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看对应信息。详情请参见步骤二:配置网络信息

port

配置为固定值 8123。

user & password

  • user:固定配置为 bytehouse
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息 中获取API Key。详情请参见获取 API Key

database

配置为连接 ByteHouse 的数据库名称。

option(可选参数)

(可选)配置连接 ByteHouse 后使用的计算组。如果您有多个计算组,希望后续查询 ByteHouse 数据时使用指定计算组,或者希望设置其他指定参数,您可以在option中进行设置。
计算组获取方式:您可以在 ByteHouse 控制台的计算组页面查看对应计算组的 ID。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:

参数

参数说明

url

连接 ByteHouse 时,URL 需为 HTTPS,配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看对应信息。详情请参见步骤二:配置网络信息

port

配置为固定值 8123。

user & password

  • user 配置为 {accountID_or_accountName}::{username}[::{envID}],详情请参见步骤三:获取 ByteHouse 连接串信息
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库的用户名,可在 ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx::demouser::demoenv
      • 不配置环境 ID:21xxxxxxxx::demouser
  • password:可联系管理员获取数据库账号的密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

配置为连接 ByteHouse 的数据库名称。

option(可选参数)

(可选)配置连接 ByteHouse 后使用的计算组。如果您有多个计算组,希望后续查询 ByteHouse 数据时使用指定计算组,或者希望设置其他指定参数,您可以在option中进行设置。
计算组获取方式:您可以在 ByteHouse 控制台的计算组页面查看对应计算组的 ID。

基本用法

您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。

  • 超时时间配置:send_timeout 默认为 None、end_timeout 默认为 None。
  • 默认支持 keepAlive,可以复用连接和避免短链接。

连接至 ByteHouse

可参考下面代码连接至 ByteHouse,使用时注意替换连接语句中的 {Host}{Username}{Password}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

const host: &str = "{Host}";
const port: i32 = 8123;
const password: &str = "{Password}";
const user: &str = "{User}";
const database: &str = "{Database}";
const virtual_warehouse_id: &str = "{VIRTUAL_WAREHOUSE_ID}";

pub fn create_client() -> Client {
    let client = Client::default()
        .with_url(format!("https://{}:{}", host, port))
        .with_database(database)
        .with_user(user)
        .with_password(password)
        .with_option("virtual_warehouse", virtual_warehouse_id)
        .with_compression(Compression::None);
    return client;
}

自定义 query ID

您可以使用 with_option 设置 query ID。

pub async fn query_id_example() -> Result<()> {
    let client = create_client();

    // Use a simple string as query ID
    let query_id = format!("customized_{}", Uuid::new_v4());
    println!("Query ID: {}", query_id);
    // Execute a simple query with the query ID
    let result = client
        .query("SELECT 1")
        .with_option("query_id", query_id)
        .fetch_one::<u8>()
        .await?;
    println!("Result: {}", result);

    Ok(())
}

自定义 query settings

全局设置

您可通过.with_option("virtual_warehouse", virtual_warehouse_id)设置,示例如下:

const host: &str = "{Host}";
const port: i32 = 8123;
const password: &str = "{Password}";
const user: &str = "{User}";
const database: &str = "{Database}";
const virtual_warehouse_id: &str = "{VIRTUAL_WAREHOUSE_ID}";

pub fn create_client() -> Client {
    let client = Client::default()
        .with_url(format!("https://{}:{}", host, port))
        .with_database(database)
        .with_user(user)
        .with_password(password)
        .with_option("virtual_warehouse", virtual_warehouse_id)
        .with_compression(Compression::None);
    return client;
}

SQL 级别设置

每个 SQL 执行时,您可通过 .with_option("virtual_warehouse", virtual_warehouse_id) 设置,示例如下:

let all_users = client
    .query("SELECT ?fields FROM bhrusttest.users")
    .with_option("virtual_warehouse", virtual_warehouse_id)
    .fetch_all::<User>()
    .await?;

连接与查询

连接 ByteHouse 后,您可以通过 ClickHouse Rust (HTTP) Driver 进行建表、数据读写等操作,以下为简单的操作示例。

use clickhouse::{Client, Compression, Row};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

#[derive(Debug, Row, Serialize, Deserialize)]
struct AdjFactorInsertRow<'a> {
    id: u32,
    name: String,
    age: u8,
    active: bool,
    ts_code: &'a str,
    #[serde(with = "clickhouse::serde::time::date")]
    trade_date: Date,
    adj_factor: f64,
}
const host: &str = "{Host}";
const port: i32 = 8123;
const password: &str = "{Password}";
const user: &str = "{User}";
const database: &str = "{Database}";
const virtual_warehouse_id: &str = "{VIRTUAL_WAREHOUSE_ID}";

pub fn create_client() -> Client {
    let client = Client::default()
        .with_url(format!("https://{}:{}", host, port))
        .with_database(database)
        .with_user(user)
        .with_password(password)
        //.with_option("virtual_warehouse", virtual_warehouse_id)
        .with_compression(Compression::None);
    return client;
}

pub async fn quickstart_example() -> Result<()> {
    let client = create_client();

    // Drop table if exists and create a new one
    client
        .query("DROP DATABASE IF EXISTS bhrusttest")
        .execute()
        .await?;
    client
        .query("CREATE DATABASE IF NOT EXISTS bhrusttest")
        .execute()
        .await?;


    client
        .query(
            "
            CREATE TABLE bhrusttest.users (
                id       UInt32,
                name     String,
                age      UInt8,
                active   Bool
            ) ENGINE = CnchMergeTree()
            ORDER BY id
            "
        )
        .execute()
        .await?;

    println!("Table created successfully");

    // Insert some sample data
    let mut insert = client.insert("bhrusttest.users")?;

    let users = vec![
        User {
            id: 1,
            name: "Alice".to_string(),
            age: 25,
            active: true,
        },
        User {
            id: 2,
            name: "Bob".to_string(),
            age: 30,
            active: true,
        },
        User {
            id: 3,
            name: "Charlie".to_string(),
            age: 35,
            active: false,
        },
    ];

    for User in users {
        insert.write(&User).await?;
    }

    insert.end().await?;
    println!("Data inserted successfully");

    // Select all users
    let all_users = client
        .query("SELECT ?fields FROM bhrusttest.users")
        .with_option("virtual_warehouse", virtual_warehouse_id)
        .fetch_all::<User>()
        .await?;

    println!("\nAll users:");
    for User  in all_users {
        println!("{:?}", User);
    }

    Ok(())
}

#[tokio::main]
async fn main() {
    quickstart_example().await.expect("TODO: panic message");
    query_id_example().await.expect("TODO: panic message");
}


pub async fn query_id_example() -> Result<()> {
    let client = create_client();

    // Use a simple string as query ID
    let query_id = format!("customized_{}", Uuid::new_v4());
    println!("Query ID: {}", query_id);
    // Execute a simple query with the query ID
    let result = client
        .query("SELECT 1")
        .with_option("query_id", query_id)
        .fetch_one::<u8>()
        .await?;
    println!("Result: {}", result);

    Ok(())
}