SQLAlchemy 是 Python 的 SQL 工具包与对象关系映射(ORM),支持开发者以 Pythonic 方式灵活操作数据库,尤其适用于需要在 Python 应用中通过 ORM 实现与 ByteHouse 数据库的对象持久化、事务管理或复杂业务逻辑整合的场景。本文将演示如何使用 SQLAlchemy 连接至 ByteHouse 并进行数据开发。
推荐版本如下:
由于 SQLAlchemy 默认会发送 ROLLBACK、COMMIT 等事务控制指令,而 ByteHouse 不支持此类操作,在执行操作时,需添加以下命令,重写(override)相关方法以阻止这些关键字的发送,命令示例如下:
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # 获取 MySQL 方言类 dialect_cls = mysql_pymysql.MySQLDialect_pymysql # 定义空操作的回滚函数(不执行任何操作) def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # 替换方言类的原始回滚和提交方法 dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section ---
安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档。
pip install SQLAlchemy
安装 SQLAlchemy 数据库相关驱动。
安装 pymysql:使用 MySQL 的 pymysql 驱动作为与 ByteHouse 通信的协议层。
pip install pymysql
安装 cryptography:用于加密和解密。
pip install cryptography
使用以下命令连接到 ByteHouse。
from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # 获取 MySQL 方言类 dialect_cls = mysql_pymysql.MySQLDialect_pymysql # 定义空操作的回滚函数(不执行任何操作) def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # 替换方言类的原始回滚和提交方法 dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section --- engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True)
参数项 | 细项 | 配置说明 |
---|---|---|
engine | user | 需要替换为实际的集群连接用户名,您可登录 ByteHouse 控制台,单击右上角个人中心,单击账号设置,查看并复制集群连接账号。 |
password | 需要替换为集群连接密码,您可登录 ByteHouse 控制台,单击右上角个人中心,单击账号设置,查看并复制集群连接密码。 | |
bh-host | ByteHouse 实例的网络域名,您可登录 ByteHouse 控制台,单击顶部集群管理,在集群列表中单击集群名称,在基本信息页签下查看并复制公网/私网信息。 | |
echo | 建议设置为 true,启用 SQL 语句日志输出,便于调试。 |
使用 SQLAlchemy 通过 MySQL 协议连接 ByteHouse 并执行查询。
from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # 获取 MySQL 方言类 dialect_cls = mysql_pymysql.MySQLDialect_pymysql # 定义空操作的回滚函数(不执行任何操作) def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # 替换方言类的原始回滚和提交方法 dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section --- engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True) def simple_select(): with engine.connect() as conn: result = conn.execute(text("select * from user_account")) for row in result: print("Result:", row[0]) if __name__ == '__main__': simple_select()
输出结果
ORM(Object Relational Mapping)即对象关系映射,用于将面向对象编程语言(如 Python)中的对象模型与关系型数据库(如 MySQL)中的数据表结构进行自动映射。您可以使用面向对象的方式操作数据库,无需编写原生 SQL 语句。
下文演示了向 ByteHouse 数据表 user_account
中写入并查询数据,该数据表使用逻辑分布式表,表引擎为 HaMergeTree,包含 name 和 fullname 两列。
写入数据命令示例如下,使用时请将引擎连接信息、表名、列名及属性替换为实际的信息。
from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # Get the dialect class dialect_cls = mysql_pymysql.MySQLDialect_pymysql # Define a function that does nothing def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # Replace the method on the dialect class dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section --- engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True) from typing import Optional from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column class Base(DeclarativeBase): pass class User(Base): __tablename__ = "user_account" name: Mapped[str] = mapped_column(String(30), primary_key=True) fullname: Mapped[Optional[str]] def __repr__(self) -> str: return f"User(name={self.name!r}, fullname={self.fullname!r})" def insert_user(): from sqlalchemy.orm import Session with Session(engine) as session: spongebob = User( name="spongebob", fullname="Spongebob Squarepants", ) sandy = User( name="sandy", fullname="Sandy Cheeks", ) patrick = User(name="patrick", fullname="Patrick Star") session.add_all([spongebob, sandy, patrick]) session.commit() if __name__ == '__main__': insert_user()
输出结果
查询数据命令示例如下,使用时请将引擎连接信息、表名、列名及属性替换为实际的信息。
from sqlalchemy import create_engine, text from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql # --- Monkey Patch Section --- # Get the dialect class dialect_cls = mysql_pymysql.MySQLDialect_pymysql # Define a function that does nothing def no_op_rollback(self, dbapi_connection): pass def do_commit(self, dbapi_connection): pass # Replace the method on the dialect class dialect_cls.do_rollback = no_op_rollback dialect_cls.do_commit = do_commit # --- End Monkey Patch Section --- engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True) from typing import Optional from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column class Base(DeclarativeBase): pass class User(Base): __tablename__ = "user_account" name: Mapped[str] = mapped_column(String(30), primary_key=True) fullname: Mapped[Optional[str]] def __repr__(self) -> str: return f"User(name={self.name!r}, fullname={self.fullname!r})" def select_user(): from sqlalchemy import select from sqlalchemy.orm import Session session = Session(engine) stmt = select(User).where(User.name.in_(["spongebob"])) for user in session.scalars(stmt): print(user)
输出结果
SQlAlchemy 的使用详情请参考官方文档。