您可以通过开源的 ClickHouse Go 驱动程序连接到 ByteHouse 云数仓版,连接后进行数据写入查询等操作。本文为您介绍 ClickHouse Go 驱动连接 ByteHouse 云数仓版的主要操作流程和操作要点。
由于 ByteHouse 的 Go 驱动当前在维护升级中,您可以使用开源 ClickHouse 的 Go 驱动连接 ByteHouse,开源 ClickHouse 支持的 API 接口和协议如下。
细分项 | 能力说明 |
---|---|
API 接口 | 支持:
|
协议 | 支持:
|
驱动 | 已验证版本/注意事项 |
---|---|
ClickHouse Go 基础驱动 | 2.30.0,驱动下载链接 |
Go | Golang 1.21 |
您可以通过 go get
安装。
go get github.com/ClickHouse/clickhouse-go/v2@v2.30.0
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:
参数 | 配置要点 |
---|---|
{TENANT_ID}&{REGION} | 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的 租户管理>基本信息>网络信息 中查看对应信息。详情请参见步骤二:配置网络信息。 |
Database | 配置为连接 ByteHouse 的数据库名称。 |
username & password |
|
virtual_warehouse | 配置为计算组名,您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。 |
通过传递参数创建连接
package main import ( "crypto/tls" "fmt" "github.com/ClickHouse/clickhouse-go/v2" ) func main() { db := clickhouse.OpenDB(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%d", "tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com", 19000)}, Auth: clickhouse.Auth{ Database: "{DATABASE}", Username: "bytehouse", Password: "{API_KEY}", }, TLS: &tls.Config{ InsecureSkipVerify: true, }, }) if err := db.Ping(); err != nil { panic(err) } }
通过 DSN(Data Source Name,数据源名称)创建连接
package main import ( "database/sql" _ "github.com/ClickHouse/clickhouse-go/v2" ) func main() { db, err := sql.Open("clickhouse", "clickhouse://bytehouse:{API_KEY}@tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:19000/{DATABASE}?secure=true") if err != nil { panic(err) } if err := db.Ping(); err != nil { panic(err) } }
通过传递参数创建连接
package main import ( "crypto/tls" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2" ) func main() { conn := clickhouse.OpenDB(&clickhouse.Options{ Addr: []string{"tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:8123"}, Auth: clickhouse.Auth{ Database: "{DATABASE}", Username: "bytehouse", Password: "{API_KEY}", }, Protocol: clickhouse.HTTP, TLS: &tls.Config{ InsecureSkipVerify: false, }, Settings: clickhouse.Settings{ "describe_query_with_data_type_flags": "0", }, }) if err := conn.Ping(); err != nil { panic(err) } }
通过 DSN(Data Source Name,数据源名称)创建连接
package main import ( "database/sql" _ "github.com/ClickHouse/clickhouse-go/v2" ) func main() { db, err := sql.Open("clickhouse", "https://bytehouse:{API_KEY}@tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:8123/{DATABASE}?secure=true&describe_query_with_data_type_flags=0") if err != nil { panic(err) } if err := db.Ping(); err != nil { panic(err) } }
您可以在自定义参数中设置更多 Server 参数,例如设置计算组资源。
db := clickhouse.OpenDB(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%d", "tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com", 19000)}, Auth: clickhouse.Auth{ Database: "{DATABASE}", Username: "bytehouse", Password: "{API_KEY}", }, TLS: &tls.Config{ InsecureSkipVerify: true, }, Settings: clickhouse.Settings{ "virtual_warehouse": "{VW_ID}", }, })
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:
参数 | 配置要点 |
---|---|
{TENANT_ID}&{REGION} | 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的 租户管理>基本信息>网络信息 中查看对应信息。详情请参见步骤二:配置网络信息。 |
Database | 配置为连接 ByteHouse 的数据库名称。 |
username & password |
|
virtual_warehouse | 配置为计算组名,您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。 |
通过传递参数创建连接
package main import ( "crypto/tls" "fmt" "github.com/ClickHouse/clickhouse-go/v2" ) func main() { db := clickhouse.OpenDB(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%d", "tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com", 19000)}, Auth: clickhouse.Auth{ Database: "{DATABASE}", Username: "{accountID_or_accountName}::{username}[::{envID}", Password: "{password}", }, TLS: &tls.Config{ InsecureSkipVerify: true, }, }) if err := db.Ping(); err != nil { panic(err) } }
通过 DSN(Data Source Name,数据源名称)创建连接
package main import ( "database/sql" _ "github.com/ClickHouse/clickhouse-go/v2" ) func main() { db, err := sql.Open("clickhouse", "clickhouse://{accountID_or_accountName}::{username}[::{envID}:{password}@tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:19000/{DATABASE}?secure=true") if err != nil { panic(err) } if err := db.Ping(); err != nil { panic(err) } }
通过传递参数创建连接
package main import ( "crypto/tls" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2" ) func main() { conn := clickhouse.OpenDB(&clickhouse.Options{ Addr: []string{"tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:8123"}, Auth: clickhouse.Auth{ Database: "{DATABASE}", Username: "{accountID_or_accountName}::{username}[::{envID}", Password: "{password}", }, Protocol: clickhouse.HTTP, TLS: &tls.Config{ InsecureSkipVerify: false, }, Settings: clickhouse.Settings{ "describe_query_with_data_type_flags": "0", }, }) if err := conn.Ping(); err != nil { panic(err) } }
通过 DSN(Data Source Name,数据源名称)创建连接
package main import ( "database/sql" _ "github.com/ClickHouse/clickhouse-go/v2" ) func main() { db, err := sql.Open("clickhouse", "https://{accountID_or_accountName}::{username}[::{envID}:{password}@tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:8123/{DATABASE}?secure=true&describe_query_with_data_type_flags=0") if err != nil { panic(err) } if err := db.Ping(); err != nil { panic(err) } }
您可以在自定义参数中设置更多 Server 参数,例如设置计算组资源。
db := clickhouse.OpenDB(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%d", "tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com", 19000)}, Auth: clickhouse.Auth{ Database: "{DATABASE}", Username: "{accountID_or_accountName}::{username}[::{envID}", Password: "{API_KEY}", }, TLS: &tls.Config{ InsecureSkipVerify: true, }, Settings: clickhouse.Settings{ "virtual_warehouse": "{VW_ID}", }, })
可以使用Exec
方法执行任意操作。该方法不支持接收上下文,默认情况下它与后台上下文
一起执行。如果需要,用户可以使用 ExecContext
。
db.Exec("DROP TABLE IF EXISTS example") _, err = db.Exec( "CREATE TABLE IF NOT EXISTS example (Col1 UInt8, Col2 String) engine = CnchMergeTree order by tuple()", ) if err != nil { return err } db.Exec("INSERT INTO example VALUES (1, 'test-1')") db.Exec("DROP TABLE IF EXISTS example")
说明
当前 ByteHouse 不支持 Asynchronous Insert(异步插入)。
db. Begin()
启动事务。tx.Prepare("INSERT INTO...")
。batch. Exec()
为每一行执行批量插入。tx. Commit()
提交事务以发送所有行。func BatchInsert() error { db, err := GetSqlDb() if err != nil { return err } defer func() { db.Exec("DROP TABLE example") }() db.Exec("DROP TABLE IF EXISTS example") _, err = db.Exec(` CREATE TABLE IF NOT EXISTS example ( Col1 UInt8 , Col2 String , Col3 FixedString(3) , Col4 UUID , Col5 Map(String, UInt8) , Col6 Array(String) , Col7 Tuple(String, UInt8, Array(Map(String, String))) , Col8 DateTime ) Engine = CnchMergeTree() ORDER BY tuple() `) if err != nil { return err } scope, err := db.Begin() if err != nil { return err } batch, err := scope.Prepare("INSERT INTO example") if err != nil { return err } for i := 0; i < 1000; i++ { _, err := batch.Exec( uint8(42), "ClickHouse", "Inc", uuid.New(), map[string]uint8{"key": 1}, // Map(String, UInt8) []string{"Q", "W", "E", "R", "T", "Y"}, // Array(String) []any{ // Tuple(String, UInt8, Array(Map(String, String))) "String Value", uint8(5), []map[string]string{ {"key": "value"}, {"key": "value"}, {"key": "value"}, }, }, time.Now(), ) if err != nil { return err } } return scope.Commit() }
可以使用QueryRow
/Query
方法进行查询。
row := db.QueryRow("SELECT * FROM example") var ( col1 uint8 col2, col3, col4 string col5 map[string]uint8 col6 []string col7 interface{} col8 time.Time ) if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil { return err }
func UseContextSendQueryId() error { db, err := GetSqlDb() if err != nil { return err } // We can use context to send query id var one uint8 queryId, _ := uuid.NewUUID() ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(queryId.String())) if err = db.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil { return err } return nil }
func UseContextSendQuerySetting() error { db, err := GetSqlDb() if err != nil { return err } // we can use context to pass settings to a specific API call ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{ "max_execution_time": 450, })) var settingValue uint16 if err := db.QueryRowContext(ctx, "SELECT getSetting('max_execution_time')").Scan(&settingValue); err != nil { return fmt.Errorf("failed to get setting value: %v", err) } if settingValue != 450 { return fmt.Errorf("expected setting value to be 450, got %d", settingValue) } return nil }