本文介绍如何使用 ClickHouse SqlAlchemy Connector 连接并访问 ByteHouse 云数仓。
建议使用 Python 3.12 或更高版本。
细分项 | 已验证版本/注意事项 |
|---|---|
SqlAlchemy 版本 | |
ByteHouse clickhouse-connect 支持包版本 | 0.3.2+bytehouse 说明由于 ByteHouse 无法原生支持开源的 ClickHouse SqlAlchemy Connector,因此您还需要使用 ByteHouse clickhouse-connect 支持包对开源连接器进行修改支持。 |
Python 版本 | Python 3.10 |
由于 SQLAlchemy 默认会发送 ROLLBACK、COMMIT 等事务控制指令,而 ByteHouse 不支持此类操作,在执行操作时,需添加以下代码,重写(override)相关方法以阻止这些关键字的发送,代码示例如下:
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # 获取 MySQL 方言类 dialect_cls = mysql_pymysql.MySQLDialect_pymysql # 定义空操作的回滚函数(不执行任何操作) def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # 替换方言类的原始回滚和提交方法 dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section ---
安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档。
pip install SQLAlchemy
安装 SQLAlchemy 数据库相关驱动。
安装 PyMySQL:使用 MySQL 的 PyMySQL 驱动作为与 ByteHouse 通信的协议层。
pip install pymysql
安装 cryptography:用于加密和解密。
pip install cryptography
ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse SQLAlchemy Connector。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
更多 IAM 用户和数据库用户的介绍请参见以下文档:
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:
参数 | 配置要点 |
|---|---|
Host | Host 在不同协议中的字段不同,但格式相同,为 |
Port |
|
Database | 配置为连接 ByteHouse 的数据库名称。 |
user & password |
|
virtual_warehouse | 配置为计算组名。您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。示例: |
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:
参数 | 配置要点 |
|---|---|
Host | Host 在不同协议中的字段不同,但格式相同,为 |
Port | 固定为 19000。 |
Database | 配置为连接 ByteHouse 的数据库名称。 |
user & password |
|
virtual_warehouse | 配置为计算组名。您可登录 ByteHouse 控制台,单击顶部计算组,查看并复制计算组 ID。示例: |
您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。使用时注意替换连接语句中的 {Host}、{Password}、{User}、{Database}、{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息。
keepAlive,可以复用连接和避免短链接。可参考下面代码样例连接 ByteHouse。
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}' engine = create_engine(connection_string) session = make_session(engine)
使用 execution_options 方法设置 query ID。
global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}")
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450' engine = create_engine(connection_string) session = make_session(engine) try: global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}") global_settings_value = global_settings.scalar() assert global_settings_value == 450 print(f"Result: {global_settings_value}") finally: session.close()
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450' engine = create_engine(connection_string) session = make_session(engine) try: settings = {'max_execution_time': 400} query_settings = session.query(text("getSetting('max_execution_time')")).execution_options(settings = settings,query_id=f"customized_{uuid.uuid4()}") query_settings_value = query_settings.scalar() assert query_settings_value == 400 print(f"Result: {query_settings_value}") finally: session.close()
import uuid from datetime import datetime from typing import Type from sqlalchemy import create_engine, text, MetaData, Column, func from clickhouse_sqlalchemy import make_session, get_declarative_base, types, engines, Table, select from sqlalchemy.sql.ddl import DropSequence, DropSchema, CreateSchema, DropTable from sqlalchemy.testing.provision import drop_db host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}&max_execution_time=450' engine = create_engine(connection_string) session = make_session(engine) global_settings = session.query(text("getSetting('max_execution_time')")).execution_options(query_id=f"customized_{uuid.uuid4()}") global_settings_value = global_settings.scalar() assert global_settings_value == 450 print(f"Result: {global_settings_value}") settings = {'max_execution_time': 400} query_settings = session.query(text("getSetting('max_execution_time')")).execution_options(settings = settings,query_id=f"customized_{uuid.uuid4()}") query_settings_value = query_settings.scalar() assert query_settings_value == 400 print(f"Result: {query_settings_value}") result = session.execute(text('SELECT 1')) value = result.scalar() print(f"Result: {value}") # 定义元数据 metadata = MetaData() try: # 删除数据库(如果存在) with engine.connect() as conn: drop_db = DropSchema('bhpythontest', if_exists=True) conn.execute(drop_db) create_db = CreateSchema('bhpythontest', if_not_exists=True) conn.execute(create_db) print("数据库创建成功") metadata.schema = 'bhpythontest' # 定义表结构 example_table = Table( 'bhpythontest', metadata, Column('Col1', types.UInt8), Column('Col2', types.String), Column('Col3', types.String), Column('Col4', types.UUID), Column('Col5', types.Map(types.String, types.UInt8)), Column('Col6', types.Array(types.String)), # 不支持types.Tuple下的Map类型。需要追上KV flag才行,可以选择ddl的方式进行创建 Column('Col7', types.Tuple(types.String, types.UInt8, types.Array(types.String))), Column('Col8', types.DateTime), engines.CnchMergeTree(order_by=func.tuple()) ) # 创建表 metadata.create_all(engine) print("表创建成功") # 准备插入数据 data_to_insert = [] for i in range(1000): record = { 'Col1': 42, 'Col2': 'ClickHouse', 'Col3': 'Inc', 'Col4': str(uuid.uuid4()), 'Col5': {'key': 1}, 'Col6': ['Q', 'W', 'E', 'R', 'T', 'Y'], 'Col7': ('String Value', 5, ['String1', 'String2']), 'Col8': datetime.now() } data_to_insert.append(record) # 批量插入数据 with engine.connect() as conn: # 方式1:使用execute多次插入(适合小批量) # for data in data_to_insert: # conn.execute(example_table.insert(), data) # 方式2:批量插入(推荐) conn.execute(example_table.insert(), data_to_insert) print(f"数据插入成功,插入了 {len(data_to_insert)} 条记录") # 查询数据 with engine.connect() as conn: # 查询一条记录 query = select(example_table).limit(1) result = conn.execute(query).fetchone() if result: print("查询结果:") print(f"Col1: {result.Col1}") print(f"Col2: {result.Col2}") print(f"Col3: {result.Col3}") print(f"Col4: {result.Col4}") print(f"Col5: {result.Col5}") print(f"Col6: {result.Col6}") print(f"Col7: {result.Col7}") print(f"Col8: {result.Col8}") # 删除表 with engine.connect() as conn: conn.execute(DropTable(example_table)) print("表已删除") except Exception as e: print(f"操作失败: {e}") raise finally: session.close() # 关闭引擎 engine.dispose()
使用以下代码替换连接字符串中对应的 connection_string:
# Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
替换后的代码如下:
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
使用以下代码替换连接字符串中对应的 connection_string:
# Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
替换后的代码如下:
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
使用以下代码替换连接字符串中对应的 connection_string:
# Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
替换后的代码如下:
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
使用以下代码替换连接字符串中对应的 connection_string:
# Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
替换后的代码如下:
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string)
SQlAlchemy 的使用详情请参考官方文档。