You need to enable JavaScript to run this app.
ByteHouse云数仓版

ByteHouse云数仓版

复制全文
Python
ClickHouse SQLAlchemy Connector
复制全文
ClickHouse SQLAlchemy Connector

本文介绍如何使用 ClickHouse SqlAlchemy Connector 连接并访问 ByteHouse 云数仓。

环境要求

建议使用 Python 3.12 或更高版本。

推荐版本

细分项

已验证版本/注意事项

SqlAlchemy 版本

2.0.38

ByteHouse clickhouse-connect 支持包版本

0.3.2+bytehouse 说明由于 ByteHouse 无法原生支持开源的 ClickHouse SqlAlchemy Connector,因此您还需要使用 ByteHouse clickhouse-connect 支持包对开源连接器进行修改支持。

Python 版本

Python 3.10

使用限制
  • 不支持 FixString,请使用 String 替代。
  • 不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。
  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

安装 SQLAlchemy
  1. 安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档

    pip install SQLAlchemy
    
  2. 安装 cryptography:用于加密和解密。

    pip install cryptography
    

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse SQLAlchemy Connector。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

配置要点

Host

Host 在不同协议中的字段不同,但格式相同,为 tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com,配置为 ByteHouse 的公网域名,其中 tenant-{TENANT_ID}-{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

Port

  • TCP 协议固定为 19000。
  • HTTP 协议固定为 8123。

Database

配置为连接 ByteHouse 的数据库名称。

user & password

  • user:设置为 bytehouse。
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的租户管理 > 连接信息中获取 API Key。详情请参见获取 API Key

virtual_warehouse

配置为计算组名。您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。示例:vw-{environment_id}{account_id}-{virtual_warehouse_name}
如果不配置,则默认使用您在 ByteHouse 设置的默认计算组。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:

参数

配置要点

Host

Host 在不同协议中的字段不同,但格式相同,为 tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com,配置为 ByteHouse 的公网域名,其中 tenant-{TENANT_ID}-{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

Port

固定为 19000。

Database

配置为连接 ByteHouse 的数据库名称。

user & password

  • user 配置为 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}],详情请参见获取数据库用户及密码
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库账号的用户名。可在ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv
      • 不配置环境 ID:21xxxxxxxx%3A%3Ademouser
  • password:数据库账号的密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

virtual_warehouse

配置为计算组名。您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。示例:vw-{environment_id}{account_id}-{virtual_warehouse_name}
如果不配置,则默认使用您在 ByteHouse 设置的默认计算组。

基本用法

您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。使用时注意替换连接语句中的 {Host}{Password}{User}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

  • 默认支持 keepAlive,可以复用连接和避免短链接。

连接至 ByteHouse

可参考下面代码样例连接 ByteHouse。

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}'
engine = create_engine(connection_string)
session = make_session(engine)

自定义 query ID

使用 execution_options 方法设置 query ID。

global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")

自定义 query settings

全局设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450'
engine = create_engine(connection_string)
session = make_session(engine)

try:
    global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")
    global_settings_value = global_settings.scalar()
    assert global_settings_value == 450
    print(f"Result: {global_settings_value}")
finally:
    session.close()

SQL级别设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450'
engine = create_engine(connection_string)
session = make_session(engine)

try:
    settings = {'max_execution_time': 400}
    query_settings = session.query(text("getSetting('max_execution_time')")).execution_options(settings = settings,query_id=f"customized_{uuid.uuid4()}")
    query_settings_value = query_settings.scalar()
    assert query_settings_value == 400
    print(f"Result: {query_settings_value}")
finally:
    session.close()

连接与查询示例

import uuid
from datetime import datetime
from typing import Type

from sqlalchemy import create_engine, text, MetaData, Column, func
from clickhouse_sqlalchemy import make_session, get_declarative_base, types, engines, Table, select
from sqlalchemy.sql.ddl import DropSequence, DropSchema, CreateSchema, DropTable
from sqlalchemy.testing.provision import drop_db

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450'
engine = create_engine(connection_string)
session = make_session(engine)

global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")
global_settings_value = global_settings.scalar()
assert global_settings_value == 450
print(f"Result: {global_settings_value}")
settings = {'max_execution_time': 400}
query_settings = session.query(text("getSetting('max_execution_time')")).execution_options(settings = settings,query_id=f"customized_{uuid.uuid4()}")
query_settings_value = query_settings.scalar()
assert query_settings_value == 400
print(f"Result: {query_settings_value}")
result = session.execute(text('SELECT 1'))
value = result.scalar()
print(f"Result: {value}")

# 定义元数据
metadata = MetaData()

try:
    # 删除数据库(如果存在)
    with engine.connect() as conn:
        drop_db = DropSchema('bhpythontest', if_exists=True)
        conn.execute(drop_db)
        create_db = CreateSchema('bhpythontest', if_not_exists=True)
        conn.execute(create_db)
        print("数据库创建成功")
        metadata.schema = 'bhpythontest'
        # 定义表结构
        example_table = Table(
            'bhpythontest', metadata,
            Column('Col1', types.UInt8),
            Column('Col2', types.String),
            Column('Col3', types.String),
            Column('Col4', types.UUID),
            Column('Col5', types.Map(types.String, types.UInt8)),
            Column('Col6', types.Array(types.String)),
            # 不支持types.Tuple下的Map类型。需要追上KV flag才行,可以选择ddl的方式进行创建
            Column('Col7', types.Tuple(types.String, types.UInt8, types.Array(types.String))),
            Column('Col8', types.DateTime),
            engines.CnchMergeTree(order_by=func.tuple())
        )

        # 创建表
        metadata.create_all(engine)
        print("表创建成功")

        # 准备插入数据
        data_to_insert = []
        for i in range(1000):
            record = {
                'Col1': 42,
                'Col2': 'ClickHouse',
                'Col3': 'Inc',
                'Col4': str(uuid.uuid4()),
                'Col5': {'key': 1},
                'Col6': ['Q', 'W', 'E', 'R', 'T', 'Y'],
                'Col7': ('String Value', 5, ['String1', 'String2']),
                'Col8': datetime.now()
            }
            data_to_insert.append(record)

    # 批量插入数据
    with engine.connect() as conn:
        # 方式1:使用execute多次插入(适合小批量)
        # for data in data_to_insert:
        #     conn.execute(example_table.insert(), data)

        # 方式2:批量插入(推荐)
        conn.execute(example_table.insert(), data_to_insert)

        print(f"数据插入成功,插入了 {len(data_to_insert)} 条记录")

    # 查询数据
    with engine.connect() as conn:
        # 查询一条记录
        query = select(example_table).limit(1)
        result = conn.execute(query).fetchone()

        if result:
            print("查询结果:")
            print(f"Col1: {result.Col1}")
            print(f"Col2: {result.Col2}")
            print(f"Col3: {result.Col3}")
            print(f"Col4: {result.Col4}")
            print(f"Col5: {result.Col5}")
            print(f"Col6: {result.Col6}")
            print(f"Col7: {result.Col7}")
            print(f"Col8: {result.Col8}")

    # 删除表
    with engine.connect() as conn:
        conn.execute(DropTable(example_table))
        print("表已删除")

except Exception as e:
    print(f"操作失败: {e}")
    raise
finally:
    session.close()
    # 关闭引擎
    engine.dispose()

定义数据模型

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

使用不同的数据类型

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

读取数据

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

写入数据

使用以下代码替换连接字符串中对应的 connection_string

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

替换后的代码如下:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

相关参考

SQlAlchemy 的使用详情请参考官方文档

最近更新时间:2025.12.11 21:30:21
这个页面对您有帮助吗?
有用
有用
无用
无用