SQLAlchemy 是 Python 的 SQL 工具包与对象关系映射(ORM),支持开发者以 Pythonic 方式灵活操作数据库,尤其适用于需要在 Python 应用中通过 ORM 实现与 ByteHouse 数据库的对象持久化、事务管理或复杂业务逻辑整合的场景。本文将演示如何使用 SQLAlchemy 连接至 ByteHouse 并进行数据开发。
推荐版本如下:
由于 SQLAlchemy 默认会发送 ROLLBACK、COMMIT 等事务控制指令,而 ByteHouse 不支持此类操作,在执行操作时,需添加以下命令,重写(override)相关方法以阻止这些关键字的发送,命令示例如下:
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # 获取 MySQL 方言类 dialect_cls = mysql_pymysql.MySQLDialect_pymysql # 定义空操作的回滚函数(不执行任何操作) def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # 替换方言类的原始回滚和提交方法 dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section ---
安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档。
pip install SQLAlchemy
安装 SQLAlchemy 数据库相关驱动。
安装 pymysql:使用 MySQL 的 pymysql 驱动作为与 ByteHouse 通信的协议层。
pip install pymysql
安装 cryptography:用于加密和解密。
pip install cryptography
可参考下面代码样例连接 ByteHouse。
from typing import Optional from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column connection_string = "mysql+pymysql://bytehouse:<your_bytehouse_apikey>@<your_bytehouse_host>/<your_database>"
参数说明
参数项 | 细项 | 配置说明 |
---|---|---|
connection_string | <your_bytehouse_apikey> | 配置为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理>连接信息 中获取API Key。详情请参见获取 API Key。 |
<your_bytehouse_host> | 配置为 ByteHouse 的公网连接域名,其中 {TENANT_ID}、{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的 租户管理>基本信息>网络信息 中查看对应信息。详情请参见步骤二:配置网络信息。 | |
<your_database> | 配置为连接 ByteHouse 的数据库名称。 |
示例使用表 user_account
作为测试表,表创建语句如下:
CREATE table <your_database>.user_account( name string, fullname string ) ENGINE=CnchMergeTree ORDER BY name UNIQUE KEY name
使用 SQLAlchemy 通过 MySQL 协议连接 ByteHouse 并执行查询。
from typing import Optional from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column # 替换为实际的连接信息 connection_string = "mysql+pymysql://bytehouse:<your_bytehouse_apikey>@<your_bytehouse_host>/<your_database>" # --- Monkey Patch Section --- # 获取方言类 dialect_cls = mysql_pymysql.MySQLDialect_pymysql # 定义空操作的函数 def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # 替换方言类的原始回滚和提交方法 dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section --- engine = create_engine( connection_string, ) class Base(DeclarativeBase): pass class User(Base): __tablename__ = "user_account" name: Mapped[str] = mapped_column(String(30), primary_key=True) fullname: Mapped[Optional[str]] def __repr__(self) -> str: return f"User(name={self.name!r}, fullname={self.fullname!r})" def insert_user(): from sqlalchemy.orm import Session with Session(engine) as session: spongebob = User( name="spongebob", fullname="Spongebob Squarepants", ) sandy = User( name="sandy", fullname="Sandy Cheeks", ) patrick = User( name="patrick", fullname="Patrick Star" ) session.add_all([spongebob, sandy, patrick]) session.commit() def simple_select(): with engine.connect() as conn: result = conn.execute(text("SELECT 123")) for row in result: print("Result:", row[0]) def select_user(): from sqlalchemy import select from sqlalchemy.orm import Session session = Session(engine) stmt = select(User).where(User.name.in_(["spongebob"])) for user in session.scalars(stmt): print(user) if __name__ == '__main__': simple_select() insert_user() select_user()
SQlAlchemy 的使用详情请参考官方文档。