You need to enable JavaScript to run this app.
导航
MySQL SQLAlchemy Conncetor
最近更新时间:2025.06.24 19:04:03首次发布时间:2025.06.24 19:04:03
我的收藏
有用
有用
无用
无用

SQLAlchemy 是 Python 的 SQL 工具包与对象关系映射(ORM),支持开发者以 Pythonic 方式灵活操作数据库,尤其适用于需要在 Python 应用中通过 ORM 实现与 ByteHouse 数据库的对象持久化、事务管理或复杂业务逻辑整合的场景。本文将演示如何使用 SQLAlchemy 连接至 ByteHouse 并进行数据开发。

前提条件

推荐版本如下:

  • 推荐安装 Python 3.12.6 及以上版本。
  • SQLAlchemy version:2.0.41

注意事项

由于 SQLAlchemy 默认会发送 ROLLBACK、COMMIT 等事务控制指令,而 ByteHouse 不支持此类操作,在执行操作时,需添加以下命令,重写(override)相关方法以阻止这些关键字的发送,命令示例如下:

from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql

# --- Monkey Patch Section ---
# 获取 MySQL 方言类
dialect_cls = mysql_pymysql.MySQLDialect_pymysql
# 定义空操作的回滚函数(不执行任何操作)
def no_op_rollback(self, dbapi_connection):
    pass
def do_commit(self, dbapi_connection):
    pass
# 替换方言类的原始回滚和提交方法
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit
# --- End Monkey Patch Section ---

安装 SQLAlchemy
  1. 安装 SQLAlchemy 及依赖模块。更多 SQLAlchemy 安装说明请参见 SQLAlchemy 官方文档

    pip install SQLAlchemy
    
  2. 安装 SQLAlchemy 数据库相关驱动。

    • 安装 pymysql:使用 MySQL 的 pymysql 驱动作为与 ByteHouse 通信的协议层。

      pip install pymysql
      
    • 安装 cryptography:用于加密和解密。

      pip install cryptography
      

连接至 ByteHouse

可参考下面代码样例连接 ByteHouse。

from typing import Optional
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column

connection_string = "mysql+pymysql://bytehouse:<your_bytehouse_apikey>@<your_bytehouse_host>/<your_database>"

参数说明

参数项

细项

配置说明

connection_string

<your_bytehouse_apikey>

配置为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理>连接信息 中获取API Key。详情请参见获取 API Key

<your_bytehouse_host>

配置为 ByteHouse 的公网连接域名,其中 {TENANT_ID}、{REGION} 分别为火山引擎主账号的账号 ID 和 ByteHouse 的地域信息,您可以在 ByteHouse 控制台的 租户管理>基本信息>网络信息 中查看对应信息。详情请参见步骤二:配置网络信息

<your_database>

配置为连接 ByteHouse 的数据库名称。

使用示例

示例使用表 user_account 作为测试表,表创建语句如下:

CREATE table <your_database>.user_account(
    name string,
    fullname string
) ENGINE=CnchMergeTree 
ORDER BY name
UNIQUE KEY name

使用 SQLAlchemy 通过 MySQL 协议连接 ByteHouse 并执行查询。

from typing import Optional
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.mysql import pymysql as mysql_pymysql
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column

# 替换为实际的连接信息
connection_string = "mysql+pymysql://bytehouse:<your_bytehouse_apikey>@<your_bytehouse_host>/<your_database>"

# --- Monkey Patch Section ---
# 获取方言类
dialect_cls = mysql_pymysql.MySQLDialect_pymysql


# 定义空操作的函数
def no_op_rollback(self, dbapi_connection):
    pass


def do_commit(self, dbapi_connection):
    pass

# 替换方言类的原始回滚和提交方法
dialect_cls.do_rollback = no_op_rollback
dialect_cls.do_commit = do_commit

# --- End Monkey Patch Section ---
engine = create_engine(
    connection_string,
)


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"
    name: Mapped[str] = mapped_column(String(30), primary_key=True)
    fullname: Mapped[Optional[str]]

    def __repr__(self) -> str:
        return f"User(name={self.name!r}, fullname={self.fullname!r})"


def insert_user():
    from sqlalchemy.orm import Session

    with Session(engine) as session:
        spongebob = User(
            name="spongebob",
            fullname="Spongebob Squarepants",
        )
        sandy = User(
            name="sandy",
            fullname="Sandy Cheeks",
        )
        patrick = User(
            name="patrick",
            fullname="Patrick Star"
        )
        session.add_all([spongebob, sandy, patrick])
        session.commit()


def simple_select():
    with engine.connect() as conn:
        result = conn.execute(text("SELECT 123"))
        for row in result:
            print("Result:", row[0])


def select_user():
    from sqlalchemy import select
    from sqlalchemy.orm import Session

    session = Session(engine)

    stmt = select(User).where(User.name.in_(["spongebob"]))

    for user in session.scalars(stmt):
        print(user)


if __name__ == '__main__':
    simple_select()
    insert_user()
    select_user()

相关参考

SQlAlchemy 的使用详情请参考官方文档