You need to enable JavaScript to run this app.
导航
MySQL SQLAlchemy Conncetor
最近更新时间:2025.06.24 19:03:54首次发布时间:2025.06.24 19:03:54
我的收藏
有用
有用
无用
无用

SQLAlchemy 是 Python 的 SQL 工具包与对象关系映射(ORM),支持开发者以 Pythonic 方式灵活操作数据库,尤其适用于需要在 Python 应用中通过 ORM 实现与 ByteHouse 数据库的对象持久化、事务管理或复杂业务逻辑整合的场景。本文将演示如何使用 SQLAlchemy 连接至 ByteHouse 并进行数据开发。

前提条件

推荐版本如下:

  • 推荐安装 Python 3.12.6 及以上版本。
  • SQLAlchemy version:2.0.41

注意事项

由于 SQLAlchemy 默认会发送 ROLLBACK、COMMIT 等事务控制指令,而 ByteHouse 不支持此类操作,在执行操作时,需添加以下命令,重写(override)相关方法以阻止这些关键字的发送,命令示例如下:

from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# 获取 MySQL 方言类
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# 定义空操作的回滚函数(不执行任何操作)
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# 替换方言类的原始回滚和提交方法
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

安装 SQLAlchemy
  1. 安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档

    pip install SQLAlchemy
    
  2. 安装 SQLAlchemy 数据库相关驱动。

    • 安装 pymysql:使用 MySQL 的 pymysql 驱动作为与 ByteHouse 通信的协议层。

      pip install pymysql
      
    • 安装 cryptography:用于加密和解密。

      pip install cryptography
      

连接 ByteHouse

使用以下命令连接到 ByteHouse。

from sqlalchemy import create_engine, text
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# 获取 MySQL 方言类
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# 定义空操作的回滚函数(不执行任何操作)
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# 替换方言类的原始回滚和提交方法
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True)

参数项

细项

配置说明

engine

user

需要替换为实际的集群连接用户名,您可登录 ByteHouse 控制台,单击右上角个人中心,单击账号设置,查看并复制集群连接账号。

password

需要替换为集群连接密码,您可登录 ByteHouse 控制台,单击右上角个人中心,单击账号设置,查看并复制集群连接密码。

bh-host

ByteHouse 实例的网络域名,您可登录 ByteHouse 控制台,单击顶部集群管理,在集群列表中单击集群名称,在基本信息页签下查看并复制公网/私网信息。

echo

建议设置为 true,启用 SQL 语句日志输出,便于调试。

使用示例

使用 MySQL 协议连接并查询

使用 SQLAlchemy 通过 MySQL 协议连接 ByteHouse 并执行查询。

from sqlalchemy import create_engine, text
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# 获取 MySQL 方言类
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# 定义空操作的回滚函数(不执行任何操作)
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# 替换方言类的原始回滚和提交方法
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True)

def simple_select():
    with engine.connect() as conn:
        result = conn.execute(text("select * from user_account"))
        for row in result:
            print("Result:", row[0])
            
if __name__ == '__main__':
    simple_select()

输出结果

ORM

ORM(Object Relational Mapping)即对象关系映射,用于将面向对象编程语言(如 Python)中的对象模型与关系型数据库(如 MySQL)中的数据表结构进行自动映射。您可以使用面向对象的方式操作数据库,无需编写原生 SQL 语句。
下文演示了向 ByteHouse 数据表 user_account 中写入并查询数据,该数据表使用逻辑分布式表,表引擎为 HaMergeTree,包含 name 和 fullname 两列。
Image

对象持久化写入

写入数据命令示例如下,使用时请将引擎连接信息、表名、列名及属性替换为实际的信息。

from sqlalchemy import create_engine, text
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# Get the dialect class
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# Define a function that does nothing
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# Replace the method on the dialect class
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True)

from typing import Optional
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user_account"
    name: Mapped[str] = mapped_column(String(30), primary_key=True)
    fullname: Mapped[Optional[str]]
    def __repr__(self) -> str:
        return f"User(name={self.name!r}, fullname={self.fullname!r})"

def insert_user():
    from sqlalchemy.orm import Session

    with Session(engine) as session:
        spongebob = User(
            name="spongebob",
            fullname="Spongebob Squarepants",
        )
        sandy = User(
            name="sandy",
            fullname="Sandy Cheeks",
        )
        patrick = User(name="patrick", fullname="Patrick Star")
        session.add_all([spongebob, sandy, patrick])
        session.commit()

if __name__ == '__main__':
    insert_user()

输出结果

查询

查询数据命令示例如下,使用时请将引擎连接信息、表名、列名及属性替换为实际的信息。

from sqlalchemy import create_engine, text
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# Get the dialect class
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# Define a function that does nothing
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# Replace the method on the dialect class
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

engine = create_engine("mysql+pymysql://{user}:{password}@{bh-host}-public.bytehouse-ce.volces.com:9004", echo=True)

from typing import Optional
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "user_account"
    name: Mapped[str] = mapped_column(String(30), primary_key=True)
    fullname: Mapped[Optional[str]]
    def __repr__(self) -> str:
        return f"User(name={self.name!r}, fullname={self.fullname!r})"

def select_user():
    from sqlalchemy import select
    from sqlalchemy.orm import Session

    session = Session(engine)

    stmt = select(User).where(User.name.in_(["spongebob"]))

    for user in session.scalars(stmt):
        print(user)

输出结果

相关参考

SQlAlchemy 的使用详情请参考官方文档