You need to enable JavaScript to run this app.
导航
ClickHouse SQLAlchemy Connector
最近更新时间:2025.11.11 14:01:29首次发布时间:2025.03.19 16:14:44
复制全文
我的收藏
有用
有用
无用
无用

本文介绍如何使用 ClickHouse SqlAlchemy Connector 连接并访问 ByteHouse 云数仓。

环境要求

建议使用 Python 3.12 或更高版本。

推荐版本

细分项

已验证版本/注意事项

SqlAlchemy 版本

2.0.38

ByteHouse clickhouse-connect 支持包版本

0.3.2+bytehouse 说明由于 ByteHouse 无法原生支持开源的 ClickHouse SqlAlchemy Connector,因此您还需要使用 ByteHouse clickhouse-connect 支持包对开源连接器进行修改支持。

Python 版本

Python 3.10

使用限制
  • 不支持 FixString,请使用 String 替代。
  • 不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。
  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

注意事项

由于 SQLAlchemy 默认会发送 ROLLBACK、COMMIT 等事务控制指令,而 ByteHouse 不支持此类操作,在执行操作时,需添加以下代码,重写(override)相关方法以阻止这些关键字的发送,代码示例如下:

from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# 获取 MySQL 方言类
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# 定义空操作的回滚函数(不执行任何操作)
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# 替换方言类的原始回滚和提交方法
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

安装 SQLAlchemy
  1. 安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档

    pip install SQLAlchemy
    
  2. 安装 SQLAlchemy 数据库相关驱动。

    • 安装 PyMySQL:使用 MySQL 的 PyMySQL 驱动作为与 ByteHouse 通信的协议层。

      pip install pymysql
      
    • 安装 cryptography:用于加密和解密。

      pip install cryptography
      

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse SQLAlchemy Connector。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

配置要点

Host

Host 在不同协议中的字段不同,但格式相同,为 tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com,配置为 ByteHouse 的公网域名,其中 tenant-{TENANT_ID}-{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

Port

  • TCP 协议固定为 19000。
  • HTTP 协议固定为 8123。

Database

配置为连接 ByteHouse 的数据库名称。

user & password

  • user:设置为 bytehouse。
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的租户管理 > 连接信息中获取 API Key。详情请参见获取 API Key

virtual_warehouse

配置为计算组名。您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。示例:vw-{environment_id}{account_id}-{virtual_warehouse_name}
如果不配置,则默认使用您在 ByteHouse 设置的默认计算组。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:

参数

配置要点

Host

Host 在不同协议中的字段不同,但格式相同,为 tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com,配置为 ByteHouse 的公网域名,其中 tenant-{TENANT_ID}-{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

Port

固定为 19000。

Database

配置为连接 ByteHouse 的数据库名称。

user & password

  • user 配置为 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}],详情请参见获取数据库用户及密码
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库账号的用户名。可在ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv
      • 不配置环境 ID:21xxxxxxxx%3A%3Ademouser
  • password:数据库账号的密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

virtual_warehouse

配置为计算组名。您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。示例:vw-{environment_id}{account_id}-{virtual_warehouse_name}
如果不配置,则默认使用您在 ByteHouse 设置的默认计算组。

基本用法

您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。使用时注意替换连接语句中的 {Host}{Password}{User}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

  • 默认支持 keepAlive,可以复用连接和避免短链接。

连接至 ByteHouse

可参考下面代码样例连接 ByteHouse。

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}'
engine = create_engine(connection_string)
session = make_session(engine)

自定义 query ID

使用 execution_options 方法设置 query ID。

global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")

自定义 query settings

全局设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450'
engine = create_engine(connection_string)
session = make_session(engine)

try:
    global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")
    global_settings_value = global_settings.scalar()
    assert global_settings_value == 450
    print(f"Result: {global_settings_value}")
finally:
    session.close()

SQL级别设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450'
engine = create_engine(connection_string)
session = make_session(engine)

try:
    settings = {'max_execution_time': 400}
    query_settings = session.query(text("getSetting('max_execution_time')")).execution_options(settings = settings,query_id=f"customized_{uuid.uuid4()}")
    query_settings_value = query_settings.scalar()
    assert query_settings_value == 400
    print(f"Result: {query_settings_value}")
finally:
    session.close()

连接与查询示例

import uuid
from datetime import datetime
from typing import Type

from sqlalchemy import create_engine, text, MetaData, Column, func
from clickhouse_sqlalchemy import make_session, get_declarative_base, types, engines, Table, select
from sqlalchemy.sql.ddl import DropSequence, DropSchema, CreateSchema, DropTable
from sqlalchemy.testing.provision import drop_db

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450'
engine = create_engine(connection_string)
session = make_session(engine)

global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")
global_settings_value = global_settings.scalar()
assert global_settings_value == 450
print(f"Result: {global_settings_value}")
settings = {'max_execution_time': 400}
query_settings = session.query(text("getSetting('max_execution_time')")).execution_options(settings = settings,query_id=f"customized_{uuid.uuid4()}")
query_settings_value = query_settings.scalar()
assert query_settings_value == 400
print(f"Result: {query_settings_value}")
result = session.execute(text('SELECT 1'))
value = result.scalar()
print(f"Result: {value}")

# 定义元数据
metadata = MetaData()

try:
    # 删除数据库(如果存在)
    with engine.connect() as conn:
        drop_db = DropSchema('bhpythontest', if_exists=True)
        conn.execute(drop_db)
        create_db = CreateSchema('bhpythontest', if_not_exists=True)
        conn.execute(create_db)
        print("数据库创建成功")
        metadata.schema = 'bhpythontest'
        # 定义表结构
        example_table = Table(
            'bhpythontest', metadata,
            Column('Col1', types.UInt8),
            Column('Col2', types.String),
            Column('Col3', types.String),
            Column('Col4', types.UUID),
            Column('Col5', types.Map(types.String, types.UInt8)),
            Column('Col6', types.Array(types.String)),
            # 不支持types.Tuple下的Map类型。需要追上KV flag才行,可以选择ddl的方式进行创建
            Column('Col7', types.Tuple(types.String, types.UInt8, types.Array(types.String))),
            Column('Col8', types.DateTime),
            engines.CnchMergeTree(order_by=func.tuple())
        )

        # 创建表
        metadata.create_all(engine)
        print("表创建成功")

        # 准备插入数据
        data_to_insert = []
        for i in range(1000):
            record = {
                'Col1': 42,
                'Col2': 'ClickHouse',
                'Col3': 'Inc',
                'Col4': str(uuid.uuid4()),
                'Col5': {'key': 1},
                'Col6': ['Q', 'W', 'E', 'R', 'T', 'Y'],
                'Col7': ('String Value', 5, ['String1', 'String2']),
                'Col8': datetime.now()
            }
            data_to_insert.append(record)

    # 批量插入数据
    with engine.connect() as conn:
        # 方式1:使用execute多次插入(适合小批量)
        # for data in data_to_insert:
        #     conn.execute(example_table.insert(), data)

        # 方式2:批量插入(推荐)
        conn.execute(example_table.insert(), data_to_insert)

        print(f"数据插入成功,插入了 {len(data_to_insert)} 条记录")

    # 查询数据
    with engine.connect() as conn:
        # 查询一条记录
        query = select(example_table).limit(1)
        result = conn.execute(query).fetchone()

        if result:
            print("查询结果:")
            print(f"Col1: {result.Col1}")
            print(f"Col2: {result.Col2}")
            print(f"Col3: {result.Col3}")
            print(f"Col4: {result.Col4}")
            print(f"Col5: {result.Col5}")
            print(f"Col6: {result.Col6}")
            print(f"Col7: {result.Col7}")
            print(f"Col8: {result.Col8}")

    # 删除表
    with engine.connect() as conn:
        conn.execute(DropTable(example_table))
        print("表已删除")

except Exception as e:
    print(f"操作失败: {e}")
    raise
finally:
    session.close()
    # 关闭引擎
    engine.dispose()

定义数据模型

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

使用不同的数据类型

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

读取数据

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

写入数据

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

相关参考

SQlAlchemy 的使用详情请参考官方文档